diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index ece6f6d5a534f..2f6055df87804 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -42,7 +42,6 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import static org.opensearch.index.translog.transfer.FileSnapshot.TransferFileSnapshot; @@ -156,14 +155,17 @@ public boolean transferSnapshot(TransferSnapshot transferSnapshot, TranslogTrans try { if (latch.await(TRANSFER_TIMEOUT_IN_MILLIS, TimeUnit.MILLISECONDS) == false) { - Exception ex = new TimeoutException("Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete"); + Exception ex = new TranslogUploadFailedException( + "Timed out waiting for transfer of snapshot " + transferSnapshot + " to complete" + ); exceptionList.forEach(ex::addSuppressed); throw ex; } } catch (InterruptedException ex) { - exceptionList.forEach(ex::addSuppressed); + Exception exception = new TranslogUploadFailedException("Failed to upload " + transferSnapshot, ex); + exceptionList.forEach(exception::addSuppressed); Thread.currentThread().interrupt(); - throw ex; + throw exception; } if (exceptionList.isEmpty()) { TransferFileSnapshot tlogMetadata = prepareMetadata(transferSnapshot); diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java index af596e7df02c2..e34bc078896f9 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/TranslogTransferManagerTests.java @@ -10,6 +10,7 @@ import org.apache.lucene.tests.util.LuceneTestCase; import org.opensearch.action.LatchedActionListener; +import org.opensearch.common.SetOnce; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; @@ -35,6 +36,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.LinkedList; @@ -180,6 +182,93 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { assertEquals(4, fileTransferTracker.allUploaded().size()); } + public void testTransferSnapshotOnUploadTimeout() throws Exception { + doAnswer(invocationOnMock -> { + Thread.sleep(31 * 1000); + return null; + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); + FileTransferTracker fileTransferTracker = new FileTransferTracker( + new ShardId("index", "indexUUid", 0), + remoteTranslogTransferTracker + ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath, + fileTransferTracker, + remoteTranslogTransferTracker + ); + SetOnce exception = new SetOnce<>(); + translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { + @Override + public void onUploadComplete(TransferSnapshot transferSnapshot) {} + + @Override + public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { + exception.set(ex); + } + }); + assertNotNull(exception.get()); + assertTrue(exception.get() instanceof TranslogUploadFailedException); + assertEquals("Timed out waiting for transfer of snapshot test-to-string to complete", exception.get().getMessage()); + } + + public void testTransferSnapshotOnThreadInterrupt() throws Exception { + SetOnce uploadThread = new SetOnce<>(); + doAnswer(invocationOnMock -> { + uploadThread.set(new Thread(() -> { + ActionListener listener = invocationOnMock.getArgument(2); + try { + Thread.sleep(31 * 1000); + } catch (InterruptedException ignore) { + List list = new ArrayList<>(invocationOnMock.getArgument(0)); + listener.onFailure(new FileTransferException(list.get(0), ignore)); + } + })); + uploadThread.get().start(); + return null; + }).when(transferService).uploadBlobs(anySet(), anyMap(), any(ActionListener.class), any(WritePriority.class)); + FileTransferTracker fileTransferTracker = new FileTransferTracker( + new ShardId("index", "indexUUid", 0), + remoteTranslogTransferTracker + ); + TranslogTransferManager translogTransferManager = new TranslogTransferManager( + shardId, + transferService, + remoteBaseTransferPath, + fileTransferTracker, + remoteTranslogTransferTracker + ); + SetOnce exception = new SetOnce<>(); + + Thread thread = new Thread(() -> { + try { + translogTransferManager.transferSnapshot(createTransferSnapshot(), new TranslogTransferListener() { + @Override + public void onUploadComplete(TransferSnapshot transferSnapshot) {} + + @Override + public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) { + exception.set(ex); + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + thread.start(); + + Thread.sleep(1000); + // Interrupt the thread + thread.interrupt(); + assertBusy(() -> { + assertNotNull(exception.get()); + assertTrue(exception.get() instanceof TranslogUploadFailedException); + assertEquals("Failed to upload test-to-string", exception.get().getMessage()); + }); + uploadThread.get().interrupt(); + } + private TransferSnapshot createTransferSnapshot() { return new TransferSnapshot() { @Override @@ -232,6 +321,11 @@ public Set getTranslogFileSnapshots() { public TranslogTransferMetadata getTranslogTransferMetadata() { return new TranslogTransferMetadata(primaryTerm, generation, minTranslogGeneration, randomInt(5)); } + + @Override + public String toString() { + return "test-to-string"; + } }; }