From d22dbe123c2722d32154d8dd0af083b3093f69d1 Mon Sep 17 00:00:00 2001 From: vikasvb90 Date: Mon, 5 Feb 2024 08:50:04 +0530 Subject: [PATCH] Permit backed futures to prevent timeouts during upload bursts --- .../repositories/s3/S3Repository.java | 20 + .../repositories/s3/S3RepositoryPlugin.java | 24 +- .../s3/async/AsyncPartsHandler.java | 119 ++--- .../s3/async/AsyncTransferManager.java | 193 +++++---- .../PermitBackedRetryableFutureUtils.java | 302 +++++++++++++ .../s3/S3BlobContainerMockClientTests.java | 18 +- .../s3/S3BlobContainerRetriesTests.java | 15 +- .../s3/async/AsyncTransferManagerTests.java | 6 +- ...PermitBackedRetryableFutureUtilsTests.java | 406 ++++++++++++++++++ .../blobstore/stream/write/WritePriority.java | 3 +- .../shard/RemoteStoreRefreshListener.java | 6 +- .../index/store/RemoteDirectory.java | 10 +- .../store/RemoteSegmentStoreDirectory.java | 4 +- .../index/store/RemoteDirectoryTests.java | 6 +- .../RemoteSegmentStoreDirectoryTests.java | 4 +- 15 files changed, 972 insertions(+), 164 deletions(-) create mode 100644 plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtils.java create mode 100644 plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtilsTests.java diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index f7772a57c9afd..e7b4f66ce98e2 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -194,6 +194,26 @@ class S3Repository extends MeteredBlobStoreRepository { Setting.Property.NodeScope ); + /** + * Number of retries in case of a transfer failure. + */ + public static Setting S3_MAX_TRANSFER_RETRIES = Setting.intSetting( + "s3_max_transfer_retries", + 3, + Setting.Property.NodeScope + ); + + /** + * Percentage of total available permits to be available for priority transfers. + */ + public static Setting S3_PRIORITY_PERMIT_ALLOCATION_PERCENT = Setting.intSetting( + "s3_priority_permit_alloc_perc", + 70, + 21, + 80, + Setting.Property.NodeScope + ); + /** * Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g. */ diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index e7d2a4d024e60..42a8a09914351 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -53,6 +53,7 @@ import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; +import org.opensearch.repositories.s3.async.PermitBackedRetryableFutureUtils; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; @@ -69,6 +70,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Supplier; /** @@ -81,6 +84,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private static final String PRIORITY_FUTURE_COMPLETION = "priority_future_completion"; private static final String PRIORITY_STREAM_READER = "priority_stream_reader"; private static final String FUTURE_COMPLETION = "future_completion"; + private static final String REMOTE_TRANSFER_RETRY = "remote_transfer_retry"; private static final String STREAM_READER = "stream_reader"; protected final S3Service service; @@ -91,6 +95,8 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private AsyncExecutorContainer urgentExecutorBuilder; private AsyncExecutorContainer priorityExecutorBuilder; private AsyncExecutorContainer normalExecutorBuilder; + private ExecutorService remoteTransferRetryPool; + private ScheduledExecutorService scheduler; public S3RepositoryPlugin(final Settings settings, final Path configPath) { this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath)); @@ -120,6 +126,9 @@ public List> getExecutorBuilders(Settings settings) { TimeValue.timeValueMinutes(5) ) ); + executorBuilders.add( + new ScalingExecutorBuilder(REMOTE_TRANSFER_RETRY, allocatedProcessors(settings), allocatedProcessors(settings) * 2, TimeValue.timeValueMinutes(5)) + ); return executorBuilders; } @@ -189,6 +198,8 @@ public Collection createComponents( threadPool.executor(STREAM_READER), new AsyncTransferEventLoopGroup(normalEventLoopThreads) ); + this.remoteTransferRetryPool = threadPool.executor(REMOTE_TRANSFER_RETRY); + this.scheduler = threadPool.scheduler(); return Collections.emptyList(); } @@ -204,7 +215,14 @@ protected S3Repository createRepository( S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(), normalExecutorBuilder.getStreamReader(), priorityExecutorBuilder.getStreamReader(), - urgentExecutorBuilder.getStreamReader() + urgentExecutorBuilder.getStreamReader(), + new PermitBackedRetryableFutureUtils<>( + S3Repository.S3_MAX_TRANSFER_RETRIES.get(clusterService.getSettings()), + // High permit allocation because each op acquiring permit performs disk IO, computation and network IO. + Math.max(allocatedProcessors(clusterService.getSettings()) * 5, 10), + ((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings()))/100, + remoteTransferRetryPool, scheduler) + ); return new S3Repository( metadata, @@ -263,7 +281,9 @@ public List> getSettings() { S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING, S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING, S3Repository.REDIRECT_LARGE_S3_UPLOAD, - S3Repository.UPLOAD_RETRY_ENABLED + S3Repository.UPLOAD_RETRY_ENABLED, + S3Repository.S3_MAX_TRANSFER_RETRIES, + S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java index b4c4ed0ecaa75..c0c5b09ca8c2c 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java @@ -8,6 +8,7 @@ package org.opensearch.repositories.s3.async; +import org.opensearch.repositories.s3.StatsMetricPublisher; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; @@ -24,7 +25,6 @@ import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.io.InputStreamContainer; import org.opensearch.repositories.s3.SocketAccess; -import org.opensearch.repositories.s3.StatsMetricPublisher; import org.opensearch.repositories.s3.io.CheckedContainer; import java.io.BufferedInputStream; @@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Supplier; /** * Responsible for handling parts of the original multipart request @@ -56,8 +57,8 @@ public class AsyncPartsHandler { * @param inputStreamContainers Checksum containers * @param statsMetricPublisher sdk metric publisher * @return list of completable futures - * @throws IOException thrown in case of an IO error */ + @SuppressWarnings({"rawtypes", "unchecked"}) public static List> uploadParts( S3AsyncClient s3AsyncClient, ExecutorService executorService, @@ -69,35 +70,48 @@ public static List> uploadParts( AtomicReferenceArray completedParts, AtomicReferenceArray inputStreamContainers, StatsMetricPublisher statsMetricPublisher, - boolean uploadRetryEnabled - ) throws IOException { + boolean uploadRetryEnabled, + PermitBackedRetryableFutureUtils permitBackedRetryableFutureUtils + ) throws InterruptedException { List> futures = new ArrayList<>(); + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBackedRetryableFutureUtils.createRequestContext(); for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { - InputStreamContainer inputStreamContainer = streamContext.provideStream(partIdx); - inputStreamContainers.set(partIdx, new CheckedContainer(inputStreamContainer.getContentLength())); - UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder() - .bucket(uploadRequest.getBucket()) - .partNumber(partIdx + 1) - .key(uploadRequest.getKey()) - .uploadId(uploadId) - .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)) - .contentLength(inputStreamContainer.getContentLength()); - if (uploadRequest.doRemoteDataIntegrityCheck()) { - uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); - } - uploadPart( - s3AsyncClient, - executorService, - priorityExecutorService, - urgentExecutorService, - completedParts, - inputStreamContainers, - futures, - uploadPartRequestBuilder.build(), - inputStreamContainer, - uploadRequest, - uploadRetryEnabled - ); + int finalPartIdx = partIdx; + Supplier> partFutureSupplier = () -> { + InputStreamContainer inputStreamContainer; + try { + inputStreamContainer = streamContext.provideStream(finalPartIdx); + } catch (IOException e) { + return CompletableFuture.failedFuture(e); + } + inputStreamContainers.set(finalPartIdx, new CheckedContainer(inputStreamContainer.getContentLength())); + UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder() + .bucket(uploadRequest.getBucket()) + .partNumber(finalPartIdx + 1) + .key(uploadRequest.getKey()) + .uploadId(uploadId) + .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)) + .contentLength(inputStreamContainer.getContentLength()); + if (uploadRequest.doRemoteDataIntegrityCheck()) { + uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); + } + return uploadPart( + s3AsyncClient, + executorService, + priorityExecutorService, + urgentExecutorService, + completedParts, + inputStreamContainers, + uploadPartRequestBuilder.build(), + inputStreamContainer, + uploadRequest, + uploadRetryEnabled + ); + }; + + CompletableFuture retryableFuture = permitBackedRetryableFutureUtils.createPermitBackedRetryableFuture( + partFutureSupplier, uploadRequest.getWritePriority(), requestContext); + futures.add(retryableFuture); } return futures; @@ -133,26 +147,21 @@ public static void cleanUpParts(S3AsyncClient s3AsyncClient, UploadRequest uploa })); } - public static InputStream maybeRetryInputStream( - InputStream inputStream, - WritePriority writePriority, - boolean uploadRetryEnabled, - long contentLength - ) { + public static InputStream maybeRetryInputStream(InputStream inputStream, WritePriority writePriority, boolean uploadRetryEnabled, + long contentLength) { if (uploadRetryEnabled == true && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) { return new BufferedInputStream(inputStream, (int) (contentLength + 1)); } return inputStream; } - private static void uploadPart( + private static CompletableFuture uploadPart( S3AsyncClient s3AsyncClient, ExecutorService executorService, ExecutorService priorityExecutorService, ExecutorService urgentExecutorService, AtomicReferenceArray completedParts, AtomicReferenceArray inputStreamContainers, - List> futures, UploadPartRequest uploadPartRequest, InputStreamContainer inputStreamContainer, UploadRequest uploadRequest, @@ -169,12 +178,8 @@ private static void uploadPart( streamReadExecutor = executorService; } - InputStream inputStream = maybeRetryInputStream( - inputStreamContainer.getInputStream(), - uploadRequest.getWritePriority(), - uploadRetryEnabled, - uploadPartRequest.contentLength() - ); + InputStream inputStream = maybeRetryInputStream(inputStreamContainer.getInputStream(), uploadRequest.getWritePriority(), + uploadRetryEnabled, uploadPartRequest.contentLength()); CompletableFuture uploadPartResponseFuture = SocketAccess.doPrivileged( () -> s3AsyncClient.uploadPart( uploadPartRequest, @@ -183,19 +188,19 @@ private static void uploadPart( ); CompletableFuture convertFuture = uploadPartResponseFuture.whenComplete((resp, throwable) -> { - try { - inputStream.close(); - } catch (IOException ex) { - log.error( - () -> new ParameterizedMessage( - "Failed to close stream while uploading a part of idx {} and file {}.", - uploadPartRequest.partNumber(), - uploadPartRequest.key() - ), - ex - ); - } - }) + try { + inputStream.close(); + } catch (IOException ex) { + log.error( + () -> new ParameterizedMessage( + "Failed to close stream while uploading a part of idx {} and file {}.", + uploadPartRequest.partNumber(), + uploadPartRequest.key() + ), + ex + ); + } + }) .thenApply( uploadPartResponse -> convertUploadPartResponse( completedParts, @@ -205,9 +210,9 @@ private static void uploadPart( uploadRequest.doRemoteDataIntegrityCheck() ) ); - futures.add(convertFuture); CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartResponseFuture); + return convertFuture; } private static CompletedPart convertUploadPartResponse( diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 2259780c95276..07086dc002956 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -8,6 +8,21 @@ package org.opensearch.repositories.s3.async; +import com.jcraft.jzlib.JZlib; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.ExceptionsHelper; +import org.opensearch.common.StreamContext; +import org.opensearch.common.blobstore.exception.CorruptFileException; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.io.InputStreamContainer; +import org.opensearch.common.util.ByteUtils; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.repositories.s3.SocketAccess; +import org.opensearch.repositories.s3.StatsMetricPublisher; +import org.opensearch.repositories.s3.io.CheckedContainer; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.http.HttpStatusCode; @@ -24,21 +39,6 @@ import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.utils.CompletableFutureUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.ExceptionsHelper; -import org.opensearch.common.StreamContext; -import org.opensearch.common.blobstore.exception.CorruptFileException; -import org.opensearch.common.blobstore.stream.write.WritePriority; -import org.opensearch.common.io.InputStreamContainer; -import org.opensearch.common.util.ByteUtils; -import org.opensearch.core.common.unit.ByteSizeUnit; -import org.opensearch.core.common.unit.ByteSizeValue; -import org.opensearch.repositories.s3.SocketAccess; -import org.opensearch.repositories.s3.StatsMetricPublisher; -import org.opensearch.repositories.s3.io.CheckedContainer; - import java.io.IOException; import java.io.InputStream; import java.util.Arrays; @@ -52,8 +52,6 @@ import java.util.function.Supplier; import java.util.stream.IntStream; -import com.jcraft.jzlib.JZlib; - /** * A helper class that automatically uses multipart upload based on the size of the source object */ @@ -64,6 +62,9 @@ public final class AsyncTransferManager { private final ExecutorService urgentExecutorService; private final long minimumPartSize; + @SuppressWarnings("rawtypes") + private final PermitBackedRetryableFutureUtils permitBackedRetryableFutureUtils; + /** * The max number of parts on S3 side is 10,000 */ @@ -73,19 +74,20 @@ public final class AsyncTransferManager { * Construct a new object of AsyncTransferManager * * @param minimumPartSize The minimum part size for parallel multipart uploads - * @param executorService The stream reader {@link ExecutorService} for normal priority uploads - * @param priorityExecutorService The stream read {@link ExecutorService} for high priority uploads */ + @SuppressWarnings("rawtypes") public AsyncTransferManager( long minimumPartSize, ExecutorService executorService, ExecutorService priorityExecutorService, - ExecutorService urgentExecutorService + ExecutorService urgentExecutorService, + PermitBackedRetryableFutureUtils permitBackedRetryableFutureUtils ) { this.executorService = executorService; this.priorityExecutorService = priorityExecutorService; this.minimumPartSize = minimumPartSize; this.urgentExecutorService = urgentExecutorService; + this.permitBackedRetryableFutureUtils = permitBackedRetryableFutureUtils; } /** @@ -107,7 +109,7 @@ public CompletableFuture uploadObject( try { if (streamContext.getNumberOfParts() == 1) { log.debug(() -> "Starting the upload as a single upload part request"); - uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture, statsMetricPublisher); + uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher); } else { log.debug(() -> "Starting the upload as multipart upload request"); uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher); @@ -141,21 +143,19 @@ private void uploadInParts( // Ensure cancellations are forwarded to the createMultipartUploadFuture future CompletableFutureUtils.forwardExceptionTo(returnFuture, createMultipartUploadFuture); - createMultipartUploadFuture.whenComplete((createMultipartUploadResponse, throwable) -> { - if (throwable != null) { - handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable); - } else { - log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId()); - doUploadInParts( - s3AsyncClient, - uploadRequest, - streamContext, - returnFuture, - createMultipartUploadResponse.uploadId(), - statsMetricPublisher - ); - } - }); + String uploadId; + try { + // Block main thread here so that upload of parts doesn't get executed in future completion thread. + // We should never execute latent operation like acquisition of permit in future completion pool. + CreateMultipartUploadResponse createMultipartUploadResponse = createMultipartUploadFuture.get(); + uploadId = createMultipartUploadResponse.uploadId(); + log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId()); + } catch (Exception ex) { + handleException(returnFuture, () -> "Failed to initiate multipart upload", ex); + return; + } + + doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, uploadId, statsMetricPublisher); } private void doUploadInParts( @@ -184,7 +184,8 @@ private void doUploadInParts( completedParts, inputStreamContainers, statsMetricPublisher, - uploadRequest.isUploadRetryEnabled() + uploadRequest.isUploadRetryEnabled(), + permitBackedRetryableFutureUtils ); } catch (Exception ex) { try { @@ -196,18 +197,18 @@ private void doUploadInParts( } CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(CompletableFuture[]::new)).thenApply(resp -> { - try { - uploadRequest.getUploadFinalizer().accept(true); - } catch (IOException e) { - throw new RuntimeException(e); - } - return resp; - }).thenApply(ignore -> { - if (uploadRequest.doRemoteDataIntegrityCheck()) { - mergeAndVerifyChecksum(inputStreamContainers, uploadRequest.getKey(), uploadRequest.getExpectedChecksum()); - } - return null; - }) + try { + uploadRequest.getUploadFinalizer().accept(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + return resp; + }).thenApply(ignore -> { + if (uploadRequest.doRemoteDataIntegrityCheck()) { + mergeAndVerifyChecksum(inputStreamContainers, uploadRequest.getKey(), uploadRequest.getExpectedChecksum()); + } + return null; + }) .thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts, statsMetricPublisher)) .handle(handleExceptionOrResponse(s3AsyncClient, uploadRequest, returnFuture, uploadId)) .exceptionally(throwable -> { @@ -315,13 +316,14 @@ public long calculateOptimalPartSize(long contentLengthOfSource, WritePriority w return (long) Math.max(optimalPartSize, minimumPartSize); } + @SuppressWarnings("unchecked") private void uploadInOneChunk( S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, - InputStreamContainer inputStreamContainer, + StreamContext streamContext, CompletableFuture returnFuture, StatsMetricPublisher statsMetricPublisher - ) { + ) throws InterruptedException { PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() .bucket(uploadRequest.getBucket()) .key(uploadRequest.getKey()) @@ -340,53 +342,64 @@ private void uploadInOneChunk( streamReadExecutor = executorService; } - InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream( - inputStreamContainer.getInputStream(), - uploadRequest.getWritePriority(), - uploadRequest.isUploadRetryEnabled(), - uploadRequest.getContentLength() - ); - CompletableFuture putObjectFuture = SocketAccess.doPrivileged( - () -> s3AsyncClient.putObject( - putObjectRequestBuilder.build(), - AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor) - ).handle((resp, throwable) -> { + + Supplier> putObjectFutureSupplier = () -> SocketAccess.doPrivileged( + () -> { + InputStreamContainer inputStreamContainer; try { - inputStream.close(); + inputStreamContainer = streamContext.provideStream(0); } catch (IOException e) { - log.error( - () -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", uploadRequest.getKey()), - e - ); + return CompletableFuture.failedFuture(e); } - if (throwable != null) { - Throwable unwrappedThrowable = ExceptionsHelper.unwrap(throwable, S3Exception.class); - if (unwrappedThrowable != null) { - S3Exception s3Exception = (S3Exception) unwrappedThrowable; - if (s3Exception.statusCode() == HttpStatusCode.BAD_REQUEST - && "BadDigest".equals(s3Exception.awsErrorDetails().errorCode())) { - throw new RuntimeException(new CorruptFileException(s3Exception, uploadRequest.getKey())); - } - } - returnFuture.completeExceptionally(throwable); - } else { + InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream(inputStreamContainer.getInputStream(), + uploadRequest.getWritePriority(), + uploadRequest.isUploadRetryEnabled(), uploadRequest.getContentLength()); + return s3AsyncClient.putObject( + putObjectRequestBuilder.build(), + AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor) + ).handle((resp, throwable) -> { try { - uploadRequest.getUploadFinalizer().accept(true); + inputStream.close(); } catch (IOException e) { - throw new RuntimeException(e); + log.error( + () -> new ParameterizedMessage("Failed to close stream while uploading single file {}.", uploadRequest.getKey()), + e + ); + } + if (throwable != null) { + Throwable unwrappedThrowable = ExceptionsHelper.unwrap(throwable, S3Exception.class); + if (unwrappedThrowable != null) { + S3Exception s3Exception = (S3Exception) unwrappedThrowable; + if (s3Exception.statusCode() == HttpStatusCode.BAD_REQUEST + && "BadDigest".equals(s3Exception.awsErrorDetails().errorCode())) { + throw new RuntimeException(new CorruptFileException(s3Exception, uploadRequest.getKey())); + } + } + returnFuture.completeExceptionally(throwable); + } else { + try { + uploadRequest.getUploadFinalizer().accept(true); + } catch (IOException e) { + throw new RuntimeException(e); + } + returnFuture.complete(null); } - returnFuture.complete(null); - } - return null; - }).handle((resp, throwable) -> { - if (throwable != null) { - deleteUploadedObject(s3AsyncClient, uploadRequest); - returnFuture.completeExceptionally(throwable); - } + return null; + }).handle((resp, throwable) -> { + if (throwable != null) { + deleteUploadedObject(s3AsyncClient, uploadRequest); + returnFuture.completeExceptionally(throwable); + } - return null; - }) + return null; + }); + } + ); + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBackedRetryableFutureUtils.createRequestContext(); + CompletableFuture putObjectFuture = permitBackedRetryableFutureUtils.createPermitBackedRetryableFuture( + putObjectFutureSupplier, uploadRequest.getWritePriority(), requestContext ); CompletableFutureUtils.forwardExceptionTo(returnFuture, putObjectFuture); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtils.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtils.java new file mode 100644 index 0000000000000..c085957232cdd --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtils.java @@ -0,0 +1,302 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + + +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.bulk.BackoffPolicy; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.unit.TimeValue; +import software.amazon.awssdk.core.exception.SdkException; + +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Async wrapper over a completable future backed by transfer permits which provides natural backpressure in case + * of transfer bursts. Additionally, it retries futures (with exp backoff) which fail due to S3 exception or + * when permit couldn't be acquired within timeout period. + * + * @param Type of future response + */ +public class PermitBackedRetryableFutureUtils { + + // Package access for testing. + Semaphore lowPrioritySemaphore; + Semaphore highPrioritySemaphore; + private final int lowPriorityPermits; + private final int highPriorityPermits; + private final int maxRetryAttempts; + private static final int RETRY_BASE_INTERVAL_MILLIS = 1_000; + private final AtomicBoolean lowPriorityTransferProgress; + + private final ExecutorService remoteTransferRetryPool; + private final ScheduledExecutorService scheduler; + /** + * + * @param maxRetryAttempts max number of retries + * @param availablePermits Total available permits for transfer + * @param priorityPermitAllocation Allocation bandwidth of priority permits. Rest will be allocated to low + * priority permits. + */ + public PermitBackedRetryableFutureUtils(int maxRetryAttempts, int availablePermits, double priorityPermitAllocation, + ExecutorService remoteTransferRetryPool, ScheduledExecutorService scheduler + ) { + this.maxRetryAttempts = maxRetryAttempts; + this.highPriorityPermits = (int) (priorityPermitAllocation * availablePermits); + this.highPrioritySemaphore = new TypeSemaphore(highPriorityPermits, "high"); + this.lowPriorityPermits = availablePermits - highPriorityPermits; + this.lowPrioritySemaphore = new TypeSemaphore(lowPriorityPermits, "low"); + this.lowPriorityTransferProgress = new AtomicBoolean(); + this.remoteTransferRetryPool = remoteTransferRetryPool; + this.scheduler = scheduler; + } + + /** + * Available low priority permits + * @return available low priority permits + */ + public int getAvailableLowPriorityPermits() { + return lowPrioritySemaphore.availablePermits(); + } + + /** + * Available high priority permits + * @return available high priority permits + */ + public int getAvailableHighPriorityPermits() { + return highPrioritySemaphore.availablePermits(); + } + + /** + * Named semaphore for debugging purpose + */ + static class TypeSemaphore extends Semaphore { + private final String type; + + public TypeSemaphore(int permits, String type) { + super(permits); + this.type = type; + } + + @Override + public String toString() { + String toStr = super.toString(); + return toStr + " , type = " + type; + } + + public String getType() { + return type; + } + + } + + /** + * For multiple part requests of a single file, request context object will be set with the decision if low + * priority permits can also be utilized in high priority transfers of parts of the file. If high priority get fully + * consumed then low priority permits will be acquired for transfer. + * + * If a low priority transfer request comes in and a high priority transfer is in progress then till current + * high priority transfer finishes, low priority transfer may have to compete. This is an acceptable side effect + * because low priority transfers are generally heavy and it is ok to have slow progress in the beginning. + * + */ + public static class RequestContext { + + private final boolean lowPriorityPermitsConsumable; + private RequestContext(boolean lowPriorityPermitsConsumable) { + this.lowPriorityPermitsConsumable = lowPriorityPermitsConsumable; + } + } + + public RequestContext createRequestContext() { + return new RequestContext(this.lowPrioritySemaphore.availablePermits() == lowPriorityPermits); + } + + /** + * Custom exception to distinguish retryable futures. + */ + static class RetryableException extends CompletionException { + private final Iterator retryBackoffDelayIterator; + + public RetryableException(Iterator retryBackoffDelayIterator, String message, Throwable cause) { + super(message, cause); + this.retryBackoffDelayIterator = retryBackoffDelayIterator; + } + + public RetryableException(Iterator retryBackoffDelayIterator) { + this.retryBackoffDelayIterator = retryBackoffDelayIterator; + } + } + + /** + * DelayedExecutor and TaskSubmitter are copied from CompletableFuture. Duplicate classes are needed because + * scheduler used by these cannot be overriden and we need a way to manage it from outside. + */ + private static final class DelayedExecutor implements Executor { + private final long delay; + private final TimeUnit unit; + private final Executor executor; + private final ScheduledExecutorService scheduler; + + DelayedExecutor(long delay, TimeUnit unit, Executor executor, ScheduledExecutorService scheduler) { + this.delay = delay; this.unit = unit; this.executor = executor; this.scheduler = scheduler; + } + + public void execute(Runnable r) { + scheduler.schedule(new TaskSubmitter(executor, r), delay, unit); + } + } + + private static final class TaskSubmitter implements Runnable { + final Executor executor; + final Runnable action; + TaskSubmitter(Executor executor, Runnable action) { + this.executor = executor; + this.action = action; + } + public void run() { executor.execute(action); } + } + + /** + * + * @param futureSupplier Supplier of the completable future + * @param writePriority Priority of transfer + * @param requestContext Request context object to set the decisions pertaining to transfer before transfers are + * initiated. + * + * @return completable future backed by permits and retryable future. + */ + public CompletableFuture createPermitBackedRetryableFuture(Supplier> futureSupplier, + WritePriority writePriority, + RequestContext requestContext) { + Iterator retryBackoffDelayIterator = BackoffPolicy.exponentialBackoff( + TimeValue.timeValueMillis(RETRY_BASE_INTERVAL_MILLIS), + maxRetryAttempts + ).iterator(); + Supplier> permitBackedFutureSupplier = createPermitBackedFutureSupplier( + retryBackoffDelayIterator, requestContext.lowPriorityPermitsConsumable, + futureSupplier, writePriority); + + CompletableFuture permitBackedFuture; + try { + permitBackedFuture = permitBackedFutureSupplier.get(); + } catch (RetryableException re) { + // We need special handling when an exception occurs during first future creation itself. + permitBackedFuture = retry(re, permitBackedFutureSupplier, retryBackoffDelayIterator); + } catch (Exception ex) { + return CompletableFuture.failedFuture(ex); + } + + return flatten(permitBackedFuture.thenApply(CompletableFuture::completedFuture) + .exceptionally(t -> retry(t, permitBackedFutureSupplier, retryBackoffDelayIterator))); + } + + private static CompletableFuture flatten(CompletableFuture> completableCompletable) { + return completableCompletable.thenCompose(Function.identity()); + } + + private CompletableFuture retry(Throwable ex, + Supplier> futureSupplier, + Iterator retryBackoffDelayIterator) { + if(!(ex instanceof RetryableException)) { + return CompletableFuture.failedFuture(ex); + } + + RetryableException retryableException = (RetryableException) ex; + if (!retryBackoffDelayIterator.hasNext()) { + return CompletableFuture.failedFuture(ex); + } + + return flatten(flatten(CompletableFuture.supplyAsync(futureSupplier, + new DelayedExecutor(retryableException.retryBackoffDelayIterator.next().millis(), + TimeUnit.MILLISECONDS, remoteTransferRetryPool, scheduler))) + .thenApply(CompletableFuture::completedFuture) + .exceptionally(t -> { + if (t instanceof RetryableException) { + ex.addSuppressed(t); + return retry(ex, futureSupplier, retryBackoffDelayIterator); + } else { + ex.addSuppressed(t); + return CompletableFuture.failedFuture(ex); + } + } + )); + } + + // Package access for testing + Semaphore acquirePermit(WritePriority writePriority, boolean isLowPriorityPermitsConsumable) throws InterruptedException { + // Try acquiring low priority permit or high priority permit immediately if available. + // Otherwise, we wait for low priority permit. + if (Objects.requireNonNull(writePriority) == WritePriority.LOW) { + if (lowPrioritySemaphore.tryAcquire()) { + return lowPrioritySemaphore; + } else if (highPrioritySemaphore.availablePermits() > 0.4 * highPriorityPermits && highPrioritySemaphore.tryAcquire()) { + return highPrioritySemaphore; + } else if (lowPrioritySemaphore.tryAcquire(5, TimeUnit.MINUTES)) { + return lowPrioritySemaphore; + } + return null; + } + + // Try acquiring high priority permit or low priority permit immediately if available. + // Otherwise, we wait for high priority permit. + if (highPrioritySemaphore.tryAcquire()) { + return highPrioritySemaphore; + } else if (isLowPriorityPermitsConsumable && lowPrioritySemaphore.tryAcquire()) { + return lowPrioritySemaphore; + } else if (highPrioritySemaphore.tryAcquire(5, TimeUnit.MINUTES)) { + return highPrioritySemaphore; + } + return null; + } + + private Supplier> createPermitBackedFutureSupplier( + Iterator retryBackoffDelayIterator, + boolean lowPriorityPermitsConsumable, + Supplier> futureSupplier, + WritePriority writePriority) { + return () -> { + Semaphore semaphore; + try { + semaphore = acquirePermit(writePriority, lowPriorityPermitsConsumable); + if (semaphore == null) { + throw new RetryableException(retryBackoffDelayIterator); + } + } catch (InterruptedException e) { + throw new CompletionException(e); + } + return futureSupplier.get().handle((resp, t) -> { + try { + if (t != null) { + Throwable ex = ExceptionsHelper.unwrap(t, SdkException.class); + if (ex != null) { + throw new RetryableException(retryBackoffDelayIterator, t.getMessage(), t); + } + throw new CompletionException(t); + } + return resp; + } finally { + semaphore.release(); + } + }); + }; + } + +} diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 8c7e196d7c812..a07f59d04c84a 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -8,6 +8,7 @@ package org.opensearch.repositories.s3; +import org.opensearch.repositories.s3.async.PermitBackedRetryableFutureUtils; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.core.sync.RequestBody; @@ -63,6 +64,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -88,6 +91,8 @@ public class S3BlobContainerMockClientTests extends OpenSearchTestCase implement private MockS3AsyncService asyncService; private ExecutorService futureCompletionService; private ExecutorService streamReaderService; + private ExecutorService remoteTransferRetry; + private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; private S3BlobContainer blobContainer; @@ -361,6 +366,8 @@ public void setUp() throws Exception { asyncService = new MockS3AsyncService(configPath(), 1000); futureCompletionService = Executors.newSingleThreadExecutor(); streamReaderService = Executors.newSingleThreadExecutor(); + remoteTransferRetry = Executors.newFixedThreadPool(20); + scheduler = new ScheduledThreadPoolExecutor(1); transferNIOGroup = new AsyncTransferEventLoopGroup(1); blobContainer = createBlobContainer(); super.setUp(); @@ -370,6 +377,11 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { IOUtils.close(asyncService); + futureCompletionService.shutdown(); + streamReaderService.shutdown(); + remoteTransferRetry.shutdown(); + scheduler.shutdown(); + transferNIOGroup.close(); super.tearDown(); } @@ -407,7 +419,11 @@ private S3BlobStore createBlobStore() { S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), - asyncExecutorContainer.getStreamReader() + asyncExecutorContainer.getStreamReader(), + new + PermitBackedRetryableFutureUtils<>(3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), 0.7, + remoteTransferRetry, scheduler) ), asyncExecutorContainer, asyncExecutorContainer, diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index ceab06bd051e9..3fa55e512217a 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -31,6 +31,7 @@ package org.opensearch.repositories.s3; +import org.opensearch.repositories.s3.async.PermitBackedRetryableFutureUtils; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.io.SdkDigestInputStream; import software.amazon.awssdk.utils.internal.Base16; @@ -86,6 +87,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -113,6 +116,8 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes private S3AsyncService asyncService; private ExecutorService futureCompletionService; private ExecutorService streamReaderService; + private ExecutorService remoteTransferRetry; + private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; @Before @@ -124,6 +129,8 @@ public void setUp() throws Exception { futureCompletionService = Executors.newSingleThreadExecutor(); streamReaderService = Executors.newSingleThreadExecutor(); transferNIOGroup = new AsyncTransferEventLoopGroup(1); + remoteTransferRetry = Executors.newFixedThreadPool(20); + scheduler = new ScheduledThreadPoolExecutor(1); // needed by S3AsyncService SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", configPath().toString())); @@ -136,6 +143,8 @@ public void tearDown() throws Exception { streamReaderService.shutdown(); futureCompletionService.shutdown(); + remoteTransferRetry.shutdown(); + scheduler.shutdown(); IOUtils.close(transferNIOGroup); if (previousOpenSearchPathConf != null) { @@ -222,7 +231,11 @@ protected AsyncMultiStreamBlobContainer createBlobContainer( S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), - asyncExecutorContainer.getStreamReader() + asyncExecutorContainer.getStreamReader(), + new + PermitBackedRetryableFutureUtils<>(3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), 0.7, + remoteTransferRetry, scheduler) ), asyncExecutorContainer, asyncExecutorContainer, diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index b753b847df869..92ad0175833af 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -65,7 +65,11 @@ public void setUp() throws Exception { ByteSizeUnit.MB.toBytes(5), Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor(), - Executors.newSingleThreadExecutor() + Executors.newSingleThreadExecutor(), + new + PermitBackedRetryableFutureUtils<>(3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), 0.7, + Executors.newSingleThreadExecutor(), Executors.newSingleThreadScheduledExecutor()) ); super.setUp(); } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtilsTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtilsTests.java new file mode 100644 index 0000000000000..52a379c22dea9 --- /dev/null +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtilsTests.java @@ -0,0 +1,406 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import org.junit.After; +import org.junit.Before; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.repositories.s3.SocketAccess; +import org.opensearch.test.OpenSearchTestCase; +import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException; +import software.amazon.awssdk.utils.CompletableFutureUtils; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +@SuppressWarnings({"unchecked", "rawtypes"}) +public class PermitBackedRetryableFutureUtilsTests extends OpenSearchTestCase { + private ExecutorService testExecutor; + private ScheduledExecutorService scheduler; + + @Before + public void setup() { + this.testExecutor = Executors.newFixedThreadPool(30); + this.scheduler = Executors.newSingleThreadScheduledExecutor(); + } + + @After + public void cleanUp() { + testExecutor.shutdown(); + scheduler.shutdown(); + } + + public void testFutureExecAndPermitRelease() throws InterruptedException, ExecutionException { + PermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new + PermitBackedRetryableFutureUtils(3, Runtime.getRuntime().availableProcessors(), + 0.7, testExecutor, scheduler); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + assertEquals( (int)(0.7 * Runtime.getRuntime().availableProcessors()), maxHighPermits); + assertEquals(Runtime.getRuntime().availableProcessors() - maxHighPermits, + permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + + Supplier> supplier = () -> SocketAccess.doPrivileged(()-> CompletableFuture.supplyAsync(() -> { + assertEquals(maxHighPermits - 1, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + return "success"; + }, testExecutor)); + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.HIGH, requestContext); + resultFuture.get(); + + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } + + static class TestPermitBackedRetryableFutureUtils extends PermitBackedRetryableFutureUtils { + + private final AtomicBoolean highPermitsFullyConsumed = new AtomicBoolean(); + private final AtomicBoolean lowPermitsFullyConsumed = new AtomicBoolean(); + private final AtomicBoolean lowPermitConsumedForHighPriority = new AtomicBoolean(); + private final AtomicBoolean highPermitsConsumedForLowPriority = new AtomicBoolean(); + private final AtomicBoolean waitedForPermit = new AtomicBoolean(); + private final Runnable onWaitForPermit; + + /** + * @param maxRetryAttempts max number of retries + * @param availablePermits Total available permits for transfer + * @param priorityPermitAllocation Allocation bandwidth of priority permits. Rest will be allocated to low + * priority permits. + */ + public TestPermitBackedRetryableFutureUtils(int maxRetryAttempts, int availablePermits, double priorityPermitAllocation, + ExecutorService remoteTransferRetry, ScheduledExecutorService scheduler) { + super(maxRetryAttempts, availablePermits, priorityPermitAllocation, remoteTransferRetry, scheduler); + this.onWaitForPermit = null; + + } + + static class TestSemaphore extends TypeSemaphore { + private final Runnable onWaitForPermit; + private final Supplier preAcquireFailure; + public TestSemaphore(int permits, String type, Runnable onWaitForPermit, Supplier preAcquireFailure) { + super(permits, type); + this.onWaitForPermit = onWaitForPermit; + this.preAcquireFailure = preAcquireFailure; + } + + @Override + public boolean tryAcquire(long timeout, TimeUnit unit) + throws InterruptedException { + if (preAcquireFailure != null && preAcquireFailure.get()) { + return false; + } + onWaitForPermit.run(); + return super.tryAcquire(timeout, unit); + } + + @Override + public boolean tryAcquire() { + if (preAcquireFailure != null && preAcquireFailure.get()) { + return false; + } + return super.tryAcquire(); + } + } + + public TestPermitBackedRetryableFutureUtils(int maxRetryAttempts, int availablePermits, + double priorityPermitAllocation, Runnable onWaitForPermit, + Supplier preAcquireFailure, + ExecutorService remoteTransferRetry, + ScheduledExecutorService scheduler) { + super(maxRetryAttempts, availablePermits, priorityPermitAllocation, remoteTransferRetry, scheduler); + this.onWaitForPermit = () -> { + waitedForPermit.set(true); + onWaitForPermit.run(); + }; + this.highPrioritySemaphore = new TestSemaphore(highPrioritySemaphore.availablePermits(), "high", this.onWaitForPermit, preAcquireFailure); + this.lowPrioritySemaphore = new TestSemaphore(lowPrioritySemaphore.availablePermits(), "low", this.onWaitForPermit, preAcquireFailure); + } + + + Semaphore acquirePermit(WritePriority writePriority, boolean isLowPriorityPermitsConsumable) throws InterruptedException { + TypeSemaphore semaphore = (TypeSemaphore) super.acquirePermit(writePriority, isLowPriorityPermitsConsumable); + if (semaphore == null) { + return null; + } + if (semaphore.getType().equals("high")) { + if (getAvailableHighPriorityPermits() == 0) { + highPermitsFullyConsumed.set(true); + } + if (writePriority == WritePriority.LOW) { + highPermitsConsumedForLowPriority.set(true); + } + } else if (semaphore.getType().equals("low")) { + if (getAvailableLowPriorityPermits() == 0) { + lowPermitsFullyConsumed.set(true); + } + if (writePriority == WritePriority.HIGH) { + lowPermitConsumedForHighPriority.set(true); + } + } + return semaphore; + } + } + + public void testLowPermitConsumptionForHighTask() throws InterruptedException, ExecutionException { + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new + TestPermitBackedRetryableFutureUtils(3, Runtime.getRuntime().availableProcessors(), 0.7, + testExecutor, scheduler); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + List> futures = new ArrayList<>(); + CountDownLatch delayedLatch = new CountDownLatch(1); + for (int reqIdx = 0; reqIdx < (maxHighPermits + lowPermits);reqIdx++) { + Supplier> supplier = () -> SocketAccess.doPrivileged(()-> CompletableFuture.supplyAsync(() -> { + try { + // Keep the permit acquired + delayedLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "success"; + } , testExecutor)); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.HIGH, requestContext); + futures.add(resultFuture); + } + // Now release all permits + delayedLatch.countDown(); + + + CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(CompletableFuture[]::new)).get(); + assertTrue(permitBasedRetryableFutureUtils.lowPermitsFullyConsumed.get()); + assertTrue(permitBasedRetryableFutureUtils.highPermitsFullyConsumed.get()); + assertTrue(permitBasedRetryableFutureUtils.lowPermitConsumedForHighPriority.get()); + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } + + public void testOnlyHighPermitsAcquiredWhenLowTaskInProgress() throws ExecutionException, InterruptedException { + CountDownLatch delayedLatch = new CountDownLatch(1); + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new + TestPermitBackedRetryableFutureUtils(3, Runtime.getRuntime().availableProcessors(), 0.7, + delayedLatch::countDown, null, testExecutor, scheduler); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext lowRequestContext = permitBasedRetryableFutureUtils.createRequestContext(); + List> futures = new ArrayList<>(); + + Supplier> lowSupplier = () -> SocketAccess.doPrivileged(()-> CompletableFuture.supplyAsync(() -> { + try { + // Keep the permit acquired + delayedLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "success"; + } , testExecutor)); + CompletableFuture lowResultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(lowSupplier, + WritePriority.LOW, lowRequestContext); + futures.add(lowResultFuture); + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + for (int reqIdx = 0; reqIdx < maxHighPermits;reqIdx++) { + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + try { + // Keep the permit acquired + delayedLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "success"; + }, testExecutor)); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.HIGH, requestContext); + futures.add(resultFuture); + } + + Thread t = new Thread(()-> { + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> "success", testExecutor)); + + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture( + supplier, WritePriority.HIGH, requestContext); + futures.add(resultFuture); + }); + t.start(); + t.join(); + + CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(CompletableFuture[]::new)).get(); + assertTrue(permitBasedRetryableFutureUtils.waitedForPermit.get()); + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } + + public void testHighPermitsConsumedForLowTasks() throws ExecutionException, InterruptedException { + CountDownLatch delayedLatch = new CountDownLatch(1); + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new + TestPermitBackedRetryableFutureUtils(3, Runtime.getRuntime().availableProcessors(), 0.7, + delayedLatch::countDown, null, testExecutor, scheduler); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + List> futures = new ArrayList<>(); + for (int reqIdx = 0; reqIdx < lowPermits;reqIdx++) { + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + try { + // Keep the permit acquired + delayedLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "success"; + }, testExecutor)); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.LOW, requestContext); + futures.add(resultFuture); + } + + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + try { + // Keep the permit acquired + delayedLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "success"; + }, testExecutor)); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.LOW, requestContext); + futures.add(resultFuture); + delayedLatch.countDown(); + + CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(CompletableFuture[]::new)).get(); + assertTrue(permitBasedRetryableFutureUtils.highPermitsConsumedForLowPriority.get()); + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } + + public void testFutureRetryOnSemaphoreFailure() throws ExecutionException, InterruptedException { + int retryCount = 3; + // Multiply by 3 as there are 3 ways in which permit can be acquired. + AtomicInteger failureCount = new AtomicInteger((retryCount-1) * 3); + AtomicBoolean exhaustRetries = new AtomicBoolean(); + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new + TestPermitBackedRetryableFutureUtils(retryCount, Runtime.getRuntime().availableProcessors(), 0.7, + null, ()-> { + if (failureCount.get() > 0 || exhaustRetries.get()) { + failureCount.decrementAndGet(); + return true; + } + return false; + }, testExecutor, scheduler); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + List> futures = new ArrayList<>(); + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> "success", testExecutor)); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.HIGH, requestContext); + futures.add(resultFuture); + + CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(CompletableFuture[]::new)).get(); + + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + // Reached here so future executed successfully after retries. + + exhaustRetries.set(true); + resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.HIGH, requestContext); + resultFuture.whenComplete((r,t) -> { + assertNotNull(t); + assertTrue(t instanceof PermitBackedRetryableFutureUtils.RetryableException); + }); + CompletableFuture finalResultFuture = resultFuture; + assertThrows(Exception.class, finalResultFuture::get); + } + + public void testFutureRetryOnExecFailure() throws ExecutionException, InterruptedException { + int retryCount = 3; + AtomicInteger failureCount = new AtomicInteger(retryCount); + AtomicBoolean exhaustRetries = new AtomicBoolean(); + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new + TestPermitBackedRetryableFutureUtils(retryCount, Runtime.getRuntime().availableProcessors(), 0.7, + null, null, testExecutor, scheduler); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + List> futures = new ArrayList<>(); + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + if (failureCount.get() > 0 || exhaustRetries.get()) { + failureCount.decrementAndGet(); + throw ApiCallAttemptTimeoutException.builder().build(); + } + return "success"; + }, testExecutor)); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.HIGH, requestContext); + futures.add(resultFuture); + + CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(CompletableFuture[]::new)).get(); + + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + // Reached here so future executed successfully after retries. + + exhaustRetries.set(true); + resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.HIGH, requestContext); + resultFuture.whenComplete((r,t) -> { + assertNotNull(t); + assertTrue(t instanceof PermitBackedRetryableFutureUtils.RetryableException); + }); + CompletableFuture finalResultFuture = resultFuture; + assertThrows(Exception.class, finalResultFuture::get); + + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } + + + public void testNonRetryableFailure() throws ExecutionException, InterruptedException { + // Throw only once to ensure no retries for unknown exception. + AtomicInteger failureCount = new AtomicInteger(1); + AtomicBoolean exhaustRetries = new AtomicBoolean(); + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new + TestPermitBackedRetryableFutureUtils(3, Runtime.getRuntime().availableProcessors(), 0.7, + null, null, testExecutor, scheduler); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + if (failureCount.get() > 0) { + failureCount.decrementAndGet(); + throw new RuntimeException("Generic exception"); + } + return "success"; + }, testExecutor)); + + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.HIGH, requestContext); + resultFuture.whenComplete((r,t) -> { + assertNotNull(t); + assertFalse(t instanceof PermitBackedRetryableFutureUtils.RetryableException); + }); + assertThrows(Exception.class, resultFuture::get); + + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java index 3f341c878c3c7..9888612b444bc 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java @@ -16,5 +16,6 @@ public enum WritePriority { NORMAL, HIGH, - URGENT + URGENT, + LOW } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index e96a4bb19a960..c3a012f63b99c 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -409,10 +409,14 @@ private void uploadNewSegments(Collection localSegmentsPostRefresh, Acti batchUploadListener.onFailure(ex); }); statsListener.beforeUpload(src); - remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener); + remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener, isLowPriorityUpload()); } } + private boolean isLowPriorityUpload() { + return isLocalOrSnapshotRecovery(); + } + /** * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. * diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 345583bbbd1be..4fef8c6179c8e 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -323,11 +323,12 @@ public boolean copyFrom( String remoteFileName, IOContext context, Runnable postUploadRunner, - ActionListener listener + ActionListener listener, + boolean lowPriorityUpload ) { if (blobContainer instanceof AsyncMultiStreamBlobContainer) { try { - uploadBlob(from, src, remoteFileName, context, postUploadRunner, listener); + uploadBlob(from, src, remoteFileName, context, postUploadRunner, listener, lowPriorityUpload); } catch (Exception e) { listener.onFailure(e); } @@ -342,7 +343,8 @@ private void uploadBlob( String remoteFileName, IOContext ioContext, Runnable postUploadRunner, - ActionListener listener + ActionListener listener, + boolean lowPriorityUpload ) throws Exception { long expectedChecksum = calculateChecksumOfChecksum(from, src); long contentLength; @@ -358,7 +360,7 @@ private void uploadBlob( remoteFileName, contentLength, true, - WritePriority.NORMAL, + lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL, (size, position) -> uploadRateLimiter.apply(new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)), expectedChecksum, remoteIntegrityEnabled diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index dab99fd25b192..25915feb9a52a 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -451,7 +451,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException { * @param context IOContext to be used to open IndexInput of file during remote upload * @param listener Listener to handle upload callback events */ - public void copyFrom(Directory from, String src, IOContext context, ActionListener listener) { + public void copyFrom(Directory from, String src, IOContext context, ActionListener listener, boolean lowPriorityUpload) { try { final String remoteFileName = getNewRemoteSegmentFilename(src); boolean uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { @@ -460,7 +460,7 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen } catch (IOException e) { throw new RuntimeException("Exception in segment postUpload for file " + src, e); } - }, listener); + }, listener, lowPriorityUpload); if (uploaded == false) { copyFrom(from, src, src, context); listener.onResponse(null); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index 9e38e1749d434..ee81369725e6f 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -104,7 +104,8 @@ public void onResponse(Void t) { public void onFailure(Exception e) { fail("Listener responded with exception" + e); } - } + }, + false ); assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); assertTrue(postUploadInvoked.get()); @@ -141,7 +142,8 @@ public void onResponse(Void t) { public void onFailure(Exception e) { countDownLatch.countDown(); } - } + }, + false ); assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); assertFalse(postUploadInvoked.get()); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index 7944ee681f5fc..8c2a0befc0d2d 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -561,7 +561,7 @@ public void onResponse(Void unused) { @Override public void onFailure(Exception e) {} }; - remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener, false); assertTrue(latch.await(5000, TimeUnit.SECONDS)); assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); storeDirectory.close(); @@ -605,7 +605,7 @@ public void onFailure(Exception e) { latch.countDown(); } }; - remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener, false); assertTrue(latch.await(5000, TimeUnit.SECONDS)); assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename));