From a1ebc6105a1686c46ab97730a791e290302a4732 Mon Sep 17 00:00:00 2001 From: vikasvb90 Date: Sun, 26 Nov 2023 18:53:11 +0530 Subject: [PATCH] Optimizations in s3 async upload flow Signed-off-by: vikasvb90 --- .../repositories/s3/S3AsyncService.java | 2 +- .../repositories/s3/S3BlobContainer.java | 51 ++++++- .../repositories/s3/S3BlobStore.java | 19 +++ .../repositories/s3/S3Repository.java | 16 +++ .../repositories/s3/S3RepositoryPlugin.java | 4 +- .../opensearch/repositories/s3/S3Service.java | 2 +- .../s3/async/AsyncPartsHandler.java | 34 +++-- .../s3/async/AsyncTransferManager.java | 20 ++- .../repositories/s3/async/UploadRequest.java | 10 +- .../s3/S3BlobContainerMockClientTests.java | 125 +++++++++++++++++- .../s3/async/AsyncTransferManagerTests.java | 8 +- 11 files changed, 263 insertions(+), 28 deletions(-) diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java index 262304029a0d3..d691cad9c9d03 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java @@ -374,7 +374,7 @@ private static IrsaCredentials buildFromEnvironment(IrsaCredentials defaults) { return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName); } - private synchronized void releaseCachedClients() { + public synchronized void releaseCachedClients() { // the clients will shutdown when they will not be used anymore for (final AmazonAsyncS3Reference clientReference : clientsCache.values()) { clientReference.decRef(); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index c1180aab0e0c7..3a55fcb0bdbcd 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -91,6 +91,7 @@ import org.opensearch.repositories.s3.async.UploadRequest; import org.opensearch.repositories.s3.utils.HttpRangeUtils; +import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; @@ -188,10 +189,38 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp writeContext.getWritePriority(), writeContext.getUploadFinalizer(), writeContext.doRemoteDataIntegrityCheck(), - writeContext.getExpectedChecksum() + writeContext.getExpectedChecksum(), + blobStore.isUploadRetryEnabled() ); try { - long partSize = blobStore.getAsyncTransferManager().calculateOptimalPartSize(writeContext.getFileSize()); + if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) { + StreamContext streamContext = SocketAccess.doPrivileged( + () -> writeContext.getStreamProvider(uploadRequest.getContentLength()) + ); + InputStreamContainer inputStream = streamContext.provideStream(0); + try { + executeMultipartUpload( + blobStore, + uploadRequest.getKey(), + inputStream.getInputStream(), + uploadRequest.getContentLength() + ); + completionListener.onResponse(null); + } catch (Exception ex) { + logger.error( + () -> new ParameterizedMessage( + "Failed to upload large file {} of size {} ", + uploadRequest.getKey(), + uploadRequest.getContentLength() + ), + ex + ); + completionListener.onFailure(ex); + } + return; + } + long partSize = blobStore.getAsyncTransferManager() + .calculateOptimalPartSize(writeContext.getFileSize(), writeContext.getWritePriority(), blobStore.isUploadRetryEnabled()); StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize)); try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) { @@ -537,8 +566,14 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin PutObjectRequest putObjectRequest = putObjectRequestBuilder.build(); try (AmazonS3Reference clientReference = blobStore.clientReference()) { + final InputStream requestInputStream; + if (blobStore.isUploadRetryEnabled()) { + requestInputStream = new BufferedInputStream(input, (int) (blobSize + 1)); + } else { + requestInputStream = input; + } SocketAccess.doPrivilegedVoid( - () -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(input, blobSize)) + () -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(requestInputStream, blobSize)) ); } catch (final SdkException e) { throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e); @@ -578,6 +613,13 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256); } + final InputStream requestInputStream; + if (blobStore.isUploadRetryEnabled()) { + requestInputStream = new BufferedInputStream(input, (int) (partSize + 1)); + } else { + requestInputStream = input; + } + CreateMultipartUploadRequest createMultipartUploadRequest = createMultipartUploadRequestBuilder.build(); try (AmazonS3Reference clientReference = blobStore.clientReference()) { uploadId.set( @@ -601,10 +643,9 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName, .build(); bytesCount += uploadPartRequest.contentLength(); - final UploadPartResponse uploadResponse = SocketAccess.doPrivileged( () -> clientReference.get() - .uploadPart(uploadPartRequest, RequestBody.fromInputStream(input, uploadPartRequest.contentLength())) + .uploadPart(uploadPartRequest, RequestBody.fromInputStream(requestInputStream, uploadPartRequest.contentLength())) ); parts.add(CompletedPart.builder().partNumber(uploadPartRequest.partNumber()).eTag(uploadResponse.eTag()).build()); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index e8e043357e126..fc70fbb0db00e 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -56,8 +56,10 @@ import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING; import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE; import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING; +import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD; import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING; import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING; +import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED; class S3BlobStore implements BlobStore { @@ -71,6 +73,10 @@ class S3BlobStore implements BlobStore { private volatile ByteSizeValue bufferSize; + private volatile boolean redirectLargeUploads; + + private volatile boolean uploadRetryEnabled; + private volatile boolean serverSideEncryption; private volatile ObjectCannedACL cannedACL; @@ -119,6 +125,9 @@ class S3BlobStore implements BlobStore { this.normalExecutorBuilder = normalExecutorBuilder; this.priorityExecutorBuilder = priorityExecutorBuilder; this.urgentExecutorBuilder = urgentExecutorBuilder; + // Settings to initialize blobstore with. + this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings()); + this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); } @Override @@ -130,6 +139,8 @@ public void reload(RepositoryMetadata repositoryMetadata) { this.cannedACL = initCannedACL(CANNED_ACL_SETTING.get(repositoryMetadata.settings())); this.storageClass = initStorageClass(STORAGE_CLASS_SETTING.get(repositoryMetadata.settings())); this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings()); + this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings()); + this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); } @Override @@ -149,6 +160,14 @@ int getMaxRetries() { return service.settings(repositoryMetadata).maxRetries; } + public boolean isRedirectLargeUploads() { + return redirectLargeUploads; + } + + public boolean isUploadRetryEnabled() { + return uploadRetryEnabled; + } + public String bucket() { return bucket; } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index 728a99b1220a6..f7772a57c9afd 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -147,6 +147,20 @@ class S3Repository extends MeteredBlobStoreRepository { */ static final ByteSizeValue MAX_FILE_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.TB); + /** + * Whether large uploads need to be redirected to slow sync s3 client. + */ + static final Setting REDIRECT_LARGE_S3_UPLOAD = Setting.boolSetting( + "redirect_large_s3_upload", + true, + Setting.Property.NodeScope + ); + + /** + * Whether retry on uploads are enabled. This setting wraps inputstream with buffered stream to enable retries. + */ + static final Setting UPLOAD_RETRY_ENABLED = Setting.boolSetting("s3_upload_retry_enabled", true, Setting.Property.NodeScope); + /** * Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold, * the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and @@ -391,7 +405,9 @@ public void reload(RepositoryMetadata newRepositoryMetadata) { // Reload configs for S3RepositoryPlugin service.settings(metadata); + service.releaseCachedClients(); s3AsyncService.settings(metadata); + s3AsyncService.releaseCachedClients(); // Reload configs for S3BlobStore BlobStore blobStore = getBlobStore(); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index dd420baa970d9..e7d2a4d024e60 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -261,7 +261,9 @@ public List> getSettings() { S3ClientSettings.IDENTITY_TOKEN_FILE_SETTING, S3ClientSettings.ROLE_SESSION_NAME_SETTING, S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING, - S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING + S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING, + S3Repository.REDIRECT_LARGE_S3_UPLOAD, + S3Repository.UPLOAD_RETRY_ENABLED ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java index b1b3e19eac275..24387fb98a425 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Service.java @@ -438,7 +438,7 @@ private static IrsaCredentials buildFromEnviroment(IrsaCredentials defaults) { return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName); } - private synchronized void releaseCachedClients() { + public synchronized void releaseCachedClients() { // the clients will shutdown when they will not be used anymore for (final AmazonS3Reference clientReference : clientsCache.values()) { clientReference.decRef(); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java index 2bead6b588696..b4c4ed0ecaa75 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java @@ -23,7 +23,6 @@ import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.io.InputStreamContainer; -import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.repositories.s3.SocketAccess; import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.repositories.s3.io.CheckedContainer; @@ -55,8 +54,8 @@ public class AsyncPartsHandler { * @param uploadId Upload Id against which multi-part is being performed * @param completedParts Reference of completed parts * @param inputStreamContainers Checksum containers - * @return list of completable futures * @param statsMetricPublisher sdk metric publisher + * @return list of completable futures * @throws IOException thrown in case of an IO error */ public static List> uploadParts( @@ -69,7 +68,8 @@ public static List> uploadParts( String uploadId, AtomicReferenceArray completedParts, AtomicReferenceArray inputStreamContainers, - StatsMetricPublisher statsMetricPublisher + StatsMetricPublisher statsMetricPublisher, + boolean uploadRetryEnabled ) throws IOException { List> futures = new ArrayList<>(); for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { @@ -95,7 +95,8 @@ public static List> uploadParts( futures, uploadPartRequestBuilder.build(), inputStreamContainer, - uploadRequest + uploadRequest, + uploadRetryEnabled ); } @@ -132,6 +133,18 @@ public static void cleanUpParts(S3AsyncClient s3AsyncClient, UploadRequest uploa })); } + public static InputStream maybeRetryInputStream( + InputStream inputStream, + WritePriority writePriority, + boolean uploadRetryEnabled, + long contentLength + ) { + if (uploadRetryEnabled == true && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) { + return new BufferedInputStream(inputStream, (int) (contentLength + 1)); + } + return inputStream; + } + private static void uploadPart( S3AsyncClient s3AsyncClient, ExecutorService executorService, @@ -142,7 +155,8 @@ private static void uploadPart( List> futures, UploadPartRequest uploadPartRequest, InputStreamContainer inputStreamContainer, - UploadRequest uploadRequest + UploadRequest uploadRequest, + boolean uploadRetryEnabled ) { Integer partNumber = uploadPartRequest.partNumber(); @@ -154,9 +168,13 @@ private static void uploadPart( } else { streamReadExecutor = executorService; } - // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered - // data can be retried instead of retrying whole file by the application. - InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)); + + InputStream inputStream = maybeRetryInputStream( + inputStreamContainer.getInputStream(), + uploadRequest.getWritePriority(), + uploadRetryEnabled, + uploadPartRequest.contentLength() + ); CompletableFuture uploadPartResponseFuture = SocketAccess.doPrivileged( () -> s3AsyncClient.uploadPart( uploadPartRequest, diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 46fbdd3d0487b..2259780c95276 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -34,11 +34,11 @@ import org.opensearch.common.io.InputStreamContainer; import org.opensearch.common.util.ByteUtils; import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.repositories.s3.SocketAccess; import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.repositories.s3.io.CheckedContainer; -import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; @@ -183,7 +183,8 @@ private void doUploadInParts( uploadId, completedParts, inputStreamContainers, - statsMetricPublisher + statsMetricPublisher, + uploadRequest.isUploadRetryEnabled() ); } catch (Exception ex) { try { @@ -302,10 +303,13 @@ private static void handleException(CompletableFuture returnFuture, Suppli /** * Calculates the optimal part size of each part request if the upload operation is carried out as multipart upload. */ - public long calculateOptimalPartSize(long contentLengthOfSource) { + public long calculateOptimalPartSize(long contentLengthOfSource, WritePriority writePriority, boolean uploadRetryEnabled) { if (contentLengthOfSource < ByteSizeUnit.MB.toBytes(5)) { return contentLengthOfSource; } + if (uploadRetryEnabled && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) { + return new ByteSizeValue(5, ByteSizeUnit.MB).getBytes(); + } double optimalPartSize = contentLengthOfSource / (double) MAX_UPLOAD_PARTS; optimalPartSize = Math.ceil(optimalPartSize); return (long) Math.max(optimalPartSize, minimumPartSize); @@ -335,9 +339,13 @@ private void uploadInOneChunk( } else { streamReadExecutor = executorService; } - // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered - // data can be retried instead of retrying whole file by the application. - InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)); + + InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream( + inputStreamContainer.getInputStream(), + uploadRequest.getWritePriority(), + uploadRequest.isUploadRetryEnabled(), + uploadRequest.getContentLength() + ); CompletableFuture putObjectFuture = SocketAccess.doPrivileged( () -> s3AsyncClient.putObject( putObjectRequestBuilder.build(), diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java index 3804c8417eb9f..a5304dc4a97d6 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/UploadRequest.java @@ -25,6 +25,8 @@ public class UploadRequest { private final boolean doRemoteDataIntegrityCheck; private final Long expectedChecksum; + private boolean uploadRetryEnabled; + /** * Construct a new UploadRequest object * @@ -43,7 +45,8 @@ public UploadRequest( WritePriority writePriority, CheckedConsumer uploadFinalizer, boolean doRemoteDataIntegrityCheck, - Long expectedChecksum + Long expectedChecksum, + boolean uploadRetryEnabled ) { this.bucket = bucket; this.key = key; @@ -52,6 +55,7 @@ public UploadRequest( this.uploadFinalizer = uploadFinalizer; this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck; this.expectedChecksum = expectedChecksum; + this.uploadRetryEnabled = uploadRetryEnabled; } public String getBucket() { @@ -81,4 +85,8 @@ public boolean doRemoteDataIntegrityCheck() { public Long getExpectedChecksum() { return expectedChecksum; } + + public boolean isUploadRetryEnabled() { + return uploadRetryEnabled; + } } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 7c67519f2f3b0..8c7e196d7c812 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -9,7 +9,10 @@ package org.opensearch.repositories.s3; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; @@ -18,8 +21,10 @@ import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectResponse; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; +import software.amazon.awssdk.services.s3.model.StorageClass; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; @@ -37,6 +42,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; @@ -61,15 +67,21 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class S3BlobContainerMockClientTests extends OpenSearchTestCase implements ConfigPathSupport { @@ -516,7 +528,7 @@ public InputStreamContainer apply(Integer partNo, Long size, Long position) thro } }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); } - }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { + }, blobSize, false, WritePriority.NORMAL, uploadSuccess -> { assertTrue(uploadSuccess); if (throwExceptionOnFinalizeUpload) { throw new RuntimeException(); @@ -546,4 +558,115 @@ private long calculateLastPartSize(long totalSize, long partSize) { private int calculateNumberOfParts(long contentLength, long partSize) { return (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); } + + public void testFailureWhenLargeFileRedirected() throws IOException, ExecutionException, InterruptedException { + testLargeFilesRedirectedToSlowSyncClient(true); + } + + public void testLargeFileRedirected() throws IOException, ExecutionException, InterruptedException { + testLargeFilesRedirectedToSlowSyncClient(false); + } + + private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) throws IOException, InterruptedException { + final ByteSizeValue partSize = new ByteSizeValue(1024, ByteSizeUnit.MB); + + int numberOfParts = 20; + final long lastPartSize = new ByteSizeValue(20, ByteSizeUnit.MB).getBytes(); + final long blobSize = ((numberOfParts - 1) * partSize.getBytes()) + lastPartSize; + CountDownLatch countDownLatch = new CountDownLatch(1); + AtomicReference exceptionRef = new AtomicReference<>(); + ActionListener completionListener = ActionListener.wrap(resp -> { countDownLatch.countDown(); }, ex -> { + exceptionRef.set(ex); + countDownLatch.countDown(); + }); + + final String bucketName = randomAlphaOfLengthBetween(1, 10); + + final BlobPath blobPath = new BlobPath(); + if (randomBoolean()) { + IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); + } + + final long bufferSize = ByteSizeUnit.MB.toBytes(randomIntBetween(5, 1024)); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); + + final boolean serverSideEncryption = randomBoolean(); + when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption); + + final StorageClass storageClass = randomFrom(StorageClass.values()); + when(blobStore.getStorageClass()).thenReturn(storageClass); + when(blobStore.isRedirectLargeUploads()).thenReturn(true); + + final ObjectCannedACL cannedAccessControlList = randomBoolean() ? randomFrom(ObjectCannedACL.values()) : null; + if (cannedAccessControlList != null) { + when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList); + } + + final S3Client client = mock(S3Client.class); + final AmazonS3Reference clientReference = Mockito.spy(new AmazonS3Reference(client)); + doNothing().when(clientReference).close(); + when(blobStore.clientReference()).thenReturn(clientReference); + final CreateMultipartUploadResponse createMultipartUploadResponse = CreateMultipartUploadResponse.builder() + .uploadId(randomAlphaOfLength(10)) + .build(); + when(client.createMultipartUpload(any(CreateMultipartUploadRequest.class))).thenReturn(createMultipartUploadResponse); + if (expectException) { + when(client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))).thenThrow( + SdkException.create("Expected upload part request to fail", new RuntimeException()) + ); + } else { + when(client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))).thenReturn(UploadPartResponse.builder().build()); + } + + // Fail the completion request + when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn( + CompleteMultipartUploadResponse.builder().build() + ); + when(client.abortMultipartUpload(any(AbortMultipartUploadRequest.class))).thenReturn( + AbortMultipartUploadResponse.builder().build() + ); + + List openInputStreams = new ArrayList<>(); + final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore)); + s3BlobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() { + @Override + public StreamContext supplyStreamContext(long partSize) { + return new StreamContext(new CheckedTriFunction() { + @Override + public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { + InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); + openInputStreams.add(inputStream); + return new InputStreamContainer(inputStream, size, position); + } + }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); + } + }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener); + + assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); + if (expectException) { + assertNotNull(exceptionRef.get()); + } else { + assertNull(exceptionRef.get()); + } + verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong()); + + if (expectException) { + verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); + } else { + verify(client, times(0)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); + } + + openInputStreams.forEach(inputStream -> { + try { + inputStream.close(); + } catch (IOException ex) { + logger.error("Error closing input stream"); + } + }); + } + } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index 2437547a80a6f..b753b847df869 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -82,7 +82,7 @@ public void testOneChunkUpload() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, false, null), + }, false, null, true), new StreamContext((partIdx, partSize, position) -> { streamRef.set(new ZeroInputStream(partSize)); return new InputStreamContainer(streamRef.get(), partSize, position); @@ -127,7 +127,7 @@ public void testOneChunkUploadCorruption() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(1), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, false, null), + }, false, null, true), new StreamContext( (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), ByteSizeUnit.MB.toBytes(1), @@ -180,7 +180,7 @@ public void testMultipartUpload() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, true, 3376132981L), + }, true, 3376132981L, true), new StreamContext((partIdx, partSize, position) -> { InputStream stream = new ZeroInputStream(partSize); streams.add(stream); @@ -240,7 +240,7 @@ public void testMultipartUploadCorruption() { s3AsyncClient, new UploadRequest("bucket", "key", ByteSizeUnit.MB.toBytes(5), WritePriority.HIGH, uploadSuccess -> { // do nothing - }, true, 0L), + }, true, 0L, true), new StreamContext( (partIdx, partSize, position) -> new InputStreamContainer(new ZeroInputStream(partSize), partSize, position), ByteSizeUnit.MB.toBytes(1),