diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index f7772a57c9afd..f8481e3a9402c 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -194,6 +194,22 @@ class S3Repository extends MeteredBlobStoreRepository { Setting.Property.NodeScope ); + /** + * Number of retries in case of a transfer failure. + */ + public static Setting S3_MAX_TRANSFER_RETRIES = Setting.intSetting("s3_max_transfer_retries", 3, Setting.Property.NodeScope); + + /** + * Percentage of total available permits to be available for priority transfers. + */ + public static Setting S3_PRIORITY_PERMIT_ALLOCATION_PERCENT = Setting.intSetting( + "s3_priority_permit_alloc_perc", + 70, + 21, + 80, + Setting.Property.NodeScope + ); + /** * Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g. */ diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index e7d2a4d024e60..63872f0b98cd8 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -53,6 +53,7 @@ import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; +import org.opensearch.repositories.s3.async.PermitBackedRetryableFutureUtils; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; @@ -69,6 +70,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Supplier; /** @@ -81,6 +84,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private static final String PRIORITY_FUTURE_COMPLETION = "priority_future_completion"; private static final String PRIORITY_STREAM_READER = "priority_stream_reader"; private static final String FUTURE_COMPLETION = "future_completion"; + private static final String REMOTE_TRANSFER_RETRY = "remote_transfer_retry"; private static final String STREAM_READER = "stream_reader"; protected final S3Service service; @@ -91,6 +95,8 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private AsyncExecutorContainer urgentExecutorBuilder; private AsyncExecutorContainer priorityExecutorBuilder; private AsyncExecutorContainer normalExecutorBuilder; + private ExecutorService remoteTransferRetryPool; + private ScheduledExecutorService scheduler; public S3RepositoryPlugin(final Settings settings, final Path configPath) { this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath)); @@ -120,6 +126,14 @@ public List> getExecutorBuilders(Settings settings) { TimeValue.timeValueMinutes(5) ) ); + executorBuilders.add( + new ScalingExecutorBuilder( + REMOTE_TRANSFER_RETRY, + allocatedProcessors(settings), + allocatedProcessors(settings) * 2, + TimeValue.timeValueMinutes(5) + ) + ); return executorBuilders; } @@ -189,6 +203,8 @@ public Collection createComponents( threadPool.executor(STREAM_READER), new AsyncTransferEventLoopGroup(normalEventLoopThreads) ); + this.remoteTransferRetryPool = threadPool.executor(REMOTE_TRANSFER_RETRY); + this.scheduler = threadPool.scheduler(); return Collections.emptyList(); } @@ -204,7 +220,16 @@ protected S3Repository createRepository( S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(), normalExecutorBuilder.getStreamReader(), priorityExecutorBuilder.getStreamReader(), - urgentExecutorBuilder.getStreamReader() + urgentExecutorBuilder.getStreamReader(), + new PermitBackedRetryableFutureUtils<>( + S3Repository.S3_MAX_TRANSFER_RETRIES.get(clusterService.getSettings()), + // High permit allocation because each op acquiring permit performs disk IO, computation and network IO. + Math.max(allocatedProcessors(clusterService.getSettings()) * 5, 10), + ((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings())) / 100, + remoteTransferRetryPool, + scheduler + ) + ); return new S3Repository( metadata, @@ -263,7 +288,9 @@ public List> getSettings() { S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING, S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING, S3Repository.REDIRECT_LARGE_S3_UPLOAD, - S3Repository.UPLOAD_RETRY_ENABLED + S3Repository.UPLOAD_RETRY_ENABLED, + S3Repository.S3_MAX_TRANSFER_RETRIES, + S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java index b4c4ed0ecaa75..f3955c9d69186 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java @@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Supplier; /** * Responsible for handling parts of the original multipart request @@ -56,8 +57,8 @@ public class AsyncPartsHandler { * @param inputStreamContainers Checksum containers * @param statsMetricPublisher sdk metric publisher * @return list of completable futures - * @throws IOException thrown in case of an IO error */ + @SuppressWarnings({ "rawtypes", "unchecked" }) public static List> uploadParts( S3AsyncClient s3AsyncClient, ExecutorService executorService, @@ -69,35 +70,51 @@ public static List> uploadParts( AtomicReferenceArray completedParts, AtomicReferenceArray inputStreamContainers, StatsMetricPublisher statsMetricPublisher, - boolean uploadRetryEnabled - ) throws IOException { + boolean uploadRetryEnabled, + PermitBackedRetryableFutureUtils permitBackedRetryableFutureUtils + ) throws InterruptedException { List> futures = new ArrayList<>(); + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBackedRetryableFutureUtils.createRequestContext(); for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { - InputStreamContainer inputStreamContainer = streamContext.provideStream(partIdx); - inputStreamContainers.set(partIdx, new CheckedContainer(inputStreamContainer.getContentLength())); - UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder() - .bucket(uploadRequest.getBucket()) - .partNumber(partIdx + 1) - .key(uploadRequest.getKey()) - .uploadId(uploadId) - .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)) - .contentLength(inputStreamContainer.getContentLength()); - if (uploadRequest.doRemoteDataIntegrityCheck()) { - uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); - } - uploadPart( - s3AsyncClient, - executorService, - priorityExecutorService, - urgentExecutorService, - completedParts, - inputStreamContainers, - futures, - uploadPartRequestBuilder.build(), - inputStreamContainer, - uploadRequest, - uploadRetryEnabled + int finalPartIdx = partIdx; + Supplier> partFutureSupplier = () -> { + InputStreamContainer inputStreamContainer; + try { + inputStreamContainer = streamContext.provideStream(finalPartIdx); + } catch (IOException e) { + return CompletableFuture.failedFuture(e); + } + inputStreamContainers.set(finalPartIdx, new CheckedContainer(inputStreamContainer.getContentLength())); + UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder() + .bucket(uploadRequest.getBucket()) + .partNumber(finalPartIdx + 1) + .key(uploadRequest.getKey()) + .uploadId(uploadId) + .overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector)) + .contentLength(inputStreamContainer.getContentLength()); + if (uploadRequest.doRemoteDataIntegrityCheck()) { + uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); + } + return uploadPart( + s3AsyncClient, + executorService, + priorityExecutorService, + urgentExecutorService, + completedParts, + inputStreamContainers, + uploadPartRequestBuilder.build(), + inputStreamContainer, + uploadRequest, + uploadRetryEnabled + ); + }; + + CompletableFuture retryableFuture = permitBackedRetryableFutureUtils.createPermitBackedRetryableFuture( + partFutureSupplier, + uploadRequest.getWritePriority(), + requestContext ); + futures.add(retryableFuture); } return futures; @@ -145,14 +162,13 @@ public static InputStream maybeRetryInputStream( return inputStream; } - private static void uploadPart( + private static CompletableFuture uploadPart( S3AsyncClient s3AsyncClient, ExecutorService executorService, ExecutorService priorityExecutorService, ExecutorService urgentExecutorService, AtomicReferenceArray completedParts, AtomicReferenceArray inputStreamContainers, - List> futures, UploadPartRequest uploadPartRequest, InputStreamContainer inputStreamContainer, UploadRequest uploadRequest, @@ -205,9 +221,9 @@ private static void uploadPart( uploadRequest.doRemoteDataIntegrityCheck() ) ); - futures.add(convertFuture); CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartResponseFuture); + return convertFuture; } private static CompletedPart convertUploadPartResponse( diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 80538059d17b8..ad9ab884b0fe0 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -65,6 +65,9 @@ public final class AsyncTransferManager { private final ExecutorService urgentExecutorService; private final long minimumPartSize; + @SuppressWarnings("rawtypes") + private final PermitBackedRetryableFutureUtils permitBackedRetryableFutureUtils; + /** * The max number of parts on S3 side is 10,000 */ @@ -74,19 +77,20 @@ public final class AsyncTransferManager { * Construct a new object of AsyncTransferManager * * @param minimumPartSize The minimum part size for parallel multipart uploads - * @param executorService The stream reader {@link ExecutorService} for normal priority uploads - * @param priorityExecutorService The stream read {@link ExecutorService} for high priority uploads */ + @SuppressWarnings("rawtypes") public AsyncTransferManager( long minimumPartSize, ExecutorService executorService, ExecutorService priorityExecutorService, - ExecutorService urgentExecutorService + ExecutorService urgentExecutorService, + PermitBackedRetryableFutureUtils permitBackedRetryableFutureUtils ) { this.executorService = executorService; this.priorityExecutorService = priorityExecutorService; this.minimumPartSize = minimumPartSize; this.urgentExecutorService = urgentExecutorService; + this.permitBackedRetryableFutureUtils = permitBackedRetryableFutureUtils; } /** @@ -108,7 +112,7 @@ public CompletableFuture uploadObject( try { if (streamContext.getNumberOfParts() == 1) { log.debug(() -> "Starting the upload as a single upload part request"); - uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture, statsMetricPublisher); + uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher); } else { log.debug(() -> "Starting the upload as multipart upload request"); uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher); @@ -146,21 +150,19 @@ private void uploadInParts( // Ensure cancellations are forwarded to the createMultipartUploadFuture future CompletableFutureUtils.forwardExceptionTo(returnFuture, createMultipartUploadFuture); - createMultipartUploadFuture.whenComplete((createMultipartUploadResponse, throwable) -> { - if (throwable != null) { - handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable); - } else { - log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId()); - doUploadInParts( - s3AsyncClient, - uploadRequest, - streamContext, - returnFuture, - createMultipartUploadResponse.uploadId(), - statsMetricPublisher - ); - } - }); + String uploadId; + try { + // Block main thread here so that upload of parts doesn't get executed in future completion thread. + // We should never execute latent operation like acquisition of permit in future completion pool. + CreateMultipartUploadResponse createMultipartUploadResponse = createMultipartUploadFuture.get(); + uploadId = createMultipartUploadResponse.uploadId(); + log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId()); + } catch (Exception ex) { + handleException(returnFuture, () -> "Failed to initiate multipart upload", ex); + return; + } + + doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, uploadId, statsMetricPublisher); } private void doUploadInParts( @@ -189,7 +191,8 @@ private void doUploadInParts( completedParts, inputStreamContainers, statsMetricPublisher, - uploadRequest.isUploadRetryEnabled() + uploadRequest.isUploadRetryEnabled(), + permitBackedRetryableFutureUtils ); } catch (Exception ex) { try { @@ -320,13 +323,14 @@ public long calculateOptimalPartSize(long contentLengthOfSource, WritePriority w return (long) Math.max(optimalPartSize, minimumPartSize); } + @SuppressWarnings("unchecked") private void uploadInOneChunk( S3AsyncClient s3AsyncClient, UploadRequest uploadRequest, - InputStreamContainer inputStreamContainer, + StreamContext streamContext, CompletableFuture returnFuture, StatsMetricPublisher statsMetricPublisher - ) { + ) throws InterruptedException { PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() .bucket(uploadRequest.getBucket()) .key(uploadRequest.getKey()) @@ -349,14 +353,20 @@ private void uploadInOneChunk( streamReadExecutor = executorService; } - InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream( - inputStreamContainer.getInputStream(), - uploadRequest.getWritePriority(), - uploadRequest.isUploadRetryEnabled(), - uploadRequest.getContentLength() - ); - CompletableFuture putObjectFuture = SocketAccess.doPrivileged( - () -> s3AsyncClient.putObject( + Supplier> putObjectFutureSupplier = () -> SocketAccess.doPrivileged(() -> { + InputStreamContainer inputStreamContainer; + try { + inputStreamContainer = streamContext.provideStream(0); + } catch (IOException e) { + return CompletableFuture.failedFuture(e); + } + InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream( + inputStreamContainer.getInputStream(), + uploadRequest.getWritePriority(), + uploadRequest.isUploadRetryEnabled(), + uploadRequest.getContentLength() + ); + return s3AsyncClient.putObject( putObjectRequestBuilder.build(), AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor) ).handle((resp, throwable) -> { @@ -395,7 +405,14 @@ private void uploadInOneChunk( } return null; - }) + }); + }); + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBackedRetryableFutureUtils.createRequestContext(); + CompletableFuture putObjectFuture = permitBackedRetryableFutureUtils.createPermitBackedRetryableFuture( + putObjectFutureSupplier, + uploadRequest.getWritePriority(), + requestContext ); CompletableFutureUtils.forwardExceptionTo(returnFuture, putObjectFuture); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtils.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtils.java new file mode 100644 index 0000000000000..25a45fb28c23f --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtils.java @@ -0,0 +1,344 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import software.amazon.awssdk.core.exception.SdkException; + +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.bulk.BackoffPolicy; +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.common.unit.TimeValue; + +import java.util.Iterator; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * Async wrapper over a completable future backed by transfer permits which provides natural backpressure in case + * of transfer bursts. Additionally, it retries futures (with exp backoff) which fail due to S3 exception or + * when permit couldn't be acquired within timeout period. + * + * @param Type of future response + */ +public class PermitBackedRetryableFutureUtils { + + // Package access for testing. + Semaphore lowPrioritySemaphore; + Semaphore highPrioritySemaphore; + private final int lowPriorityPermits; + private final int highPriorityPermits; + private final int maxRetryAttempts; + private static final int RETRY_BASE_INTERVAL_MILLIS = 1_000; + private final AtomicBoolean lowPriorityTransferProgress; + + private final ExecutorService remoteTransferRetryPool; + private final ScheduledExecutorService scheduler; + + /** + * + * @param maxRetryAttempts max number of retries + * @param availablePermits Total available permits for transfer + * @param priorityPermitAllocation Allocation bandwidth of priority permits. Rest will be allocated to low + * priority permits. + */ + public PermitBackedRetryableFutureUtils( + int maxRetryAttempts, + int availablePermits, + double priorityPermitAllocation, + ExecutorService remoteTransferRetryPool, + ScheduledExecutorService scheduler + ) { + this.maxRetryAttempts = maxRetryAttempts; + this.highPriorityPermits = (int) (priorityPermitAllocation * availablePermits); + this.highPrioritySemaphore = new TypeSemaphore(highPriorityPermits, "high"); + this.lowPriorityPermits = availablePermits - highPriorityPermits; + this.lowPrioritySemaphore = new TypeSemaphore(lowPriorityPermits, "low"); + this.lowPriorityTransferProgress = new AtomicBoolean(); + this.remoteTransferRetryPool = remoteTransferRetryPool; + this.scheduler = scheduler; + } + + /** + * Available low priority permits + * @return available low priority permits + */ + public int getAvailableLowPriorityPermits() { + return lowPrioritySemaphore.availablePermits(); + } + + /** + * Available high priority permits + * @return available high priority permits + */ + public int getAvailableHighPriorityPermits() { + return highPrioritySemaphore.availablePermits(); + } + + /** + * Named semaphore for debugging purpose + */ + static class TypeSemaphore extends Semaphore { + private final String type; + + public TypeSemaphore(int permits, String type) { + super(permits); + this.type = type; + } + + @Override + public String toString() { + String toStr = super.toString(); + return toStr + " , type = " + type; + } + + public String getType() { + return type; + } + + } + + /** + * For multiple part requests of a single file, request context object will be set with the decision if low + * priority permits can also be utilized in high priority transfers of parts of the file. If high priority get fully + * consumed then low priority permits will be acquired for transfer. + * + * If a low priority transfer request comes in and a high priority transfer is in progress then till current + * high priority transfer finishes, low priority transfer may have to compete. This is an acceptable side effect + * because low priority transfers are generally heavy and it is ok to have slow progress in the beginning. + * + */ + public static class RequestContext { + + private final boolean lowPriorityPermitsConsumable; + + private RequestContext(boolean lowPriorityPermitsConsumable) { + this.lowPriorityPermitsConsumable = lowPriorityPermitsConsumable; + } + } + + public RequestContext createRequestContext() { + return new RequestContext(this.lowPrioritySemaphore.availablePermits() == lowPriorityPermits); + } + + /** + * Custom exception to distinguish retryable futures. + */ + static class RetryableException extends CompletionException { + private final Iterator retryBackoffDelayIterator; + + public RetryableException(Iterator retryBackoffDelayIterator, String message, Throwable cause) { + super(message, cause); + this.retryBackoffDelayIterator = retryBackoffDelayIterator; + } + + public RetryableException(Iterator retryBackoffDelayIterator) { + this.retryBackoffDelayIterator = retryBackoffDelayIterator; + } + } + + /** + * DelayedExecutor and TaskSubmitter are copied from CompletableFuture. Duplicate classes are needed because + * scheduler used by these cannot be overriden and we need a way to manage it from outside. + */ + private static final class DelayedExecutor implements Executor { + private final long delay; + private final TimeUnit unit; + private final Executor executor; + private final ScheduledExecutorService scheduler; + + DelayedExecutor(long delay, TimeUnit unit, Executor executor, ScheduledExecutorService scheduler) { + this.delay = delay; + this.unit = unit; + this.executor = executor; + this.scheduler = scheduler; + } + + public void execute(Runnable r) { + scheduler.schedule(new TaskSubmitter(executor, r), delay, unit); + } + } + + private static final class TaskSubmitter implements Runnable { + final Executor executor; + final Runnable action; + + TaskSubmitter(Executor executor, Runnable action) { + this.executor = executor; + this.action = action; + } + + public void run() { + executor.execute(action); + } + } + + /** + * + * @param futureSupplier Supplier of the completable future + * @param writePriority Priority of transfer + * @param requestContext Request context object to set the decisions pertaining to transfer before transfers are + * initiated. + * + * @return completable future backed by permits and retryable future. + */ + public CompletableFuture createPermitBackedRetryableFuture( + Supplier> futureSupplier, + WritePriority writePriority, + RequestContext requestContext + ) { + Iterator retryBackoffDelayIterator = BackoffPolicy.exponentialBackoff( + TimeValue.timeValueMillis(RETRY_BASE_INTERVAL_MILLIS), + maxRetryAttempts + ).iterator(); + Supplier> permitBackedFutureSupplier = createPermitBackedFutureSupplier( + retryBackoffDelayIterator, + requestContext.lowPriorityPermitsConsumable, + futureSupplier, + writePriority + ); + + CompletableFuture permitBackedFuture; + try { + permitBackedFuture = permitBackedFutureSupplier.get(); + } catch (RetryableException re) { + // We need special handling when an exception occurs during first future creation itself. + permitBackedFuture = retry(re, permitBackedFutureSupplier, retryBackoffDelayIterator); + } catch (Exception ex) { + return CompletableFuture.failedFuture(ex); + } + + return flatten( + permitBackedFuture.thenApply(CompletableFuture::completedFuture) + .exceptionally(t -> retry(t, permitBackedFutureSupplier, retryBackoffDelayIterator)) + ); + } + + private static CompletableFuture flatten( + CompletableFuture> completableCompletable + ) { + return completableCompletable.thenCompose(Function.identity()); + } + + private CompletableFuture retry( + Throwable ex, + Supplier> futureSupplier, + Iterator retryBackoffDelayIterator + ) { + if (!(ex instanceof RetryableException)) { + return CompletableFuture.failedFuture(ex); + } + + RetryableException retryableException = (RetryableException) ex; + if (!retryBackoffDelayIterator.hasNext()) { + return CompletableFuture.failedFuture(ex); + } + + return flatten( + flatten( + CompletableFuture.supplyAsync( + futureSupplier, + new DelayedExecutor( + retryableException.retryBackoffDelayIterator.next().millis(), + TimeUnit.MILLISECONDS, + remoteTransferRetryPool, + scheduler + ) + ) + ).thenApply(CompletableFuture::completedFuture).exceptionally(t -> { + if (t instanceof RetryableException) { + ex.addSuppressed(t); + return retry(ex, futureSupplier, retryBackoffDelayIterator); + } else { + ex.addSuppressed(t); + return CompletableFuture.failedFuture(ex); + } + }) + ); + } + + // Package access for testing + Semaphore acquirePermit(WritePriority writePriority, boolean isLowPriorityPermitsConsumable) throws InterruptedException { + // Try acquiring low priority permit or high priority permit immediately if available. + // Otherwise, we wait for low priority permit. + if (Objects.requireNonNull(writePriority) == WritePriority.LOW) { + if (lowPrioritySemaphore.tryAcquire()) { + return lowPrioritySemaphore; + } else if (highPrioritySemaphore.availablePermits() > 0.4 * highPriorityPermits && highPrioritySemaphore.tryAcquire()) { + return highPrioritySemaphore; + } else if (lowPrioritySemaphore.tryAcquire(5, TimeUnit.MINUTES)) { + return lowPrioritySemaphore; + } + return null; + } + + // Try acquiring high priority permit or low priority permit immediately if available. + // Otherwise, we wait for high priority permit. + if (highPrioritySemaphore.tryAcquire()) { + return highPrioritySemaphore; + } else if (isLowPriorityPermitsConsumable && lowPrioritySemaphore.tryAcquire()) { + return lowPrioritySemaphore; + } else if (highPrioritySemaphore.tryAcquire(5, TimeUnit.MINUTES)) { + return highPrioritySemaphore; + } + return null; + } + + private Supplier> createPermitBackedFutureSupplier( + Iterator retryBackoffDelayIterator, + boolean lowPriorityPermitsConsumable, + Supplier> futureSupplier, + WritePriority writePriority + ) { + return () -> { + Semaphore semaphore; + try { + semaphore = acquirePermit(writePriority, lowPriorityPermitsConsumable); + if (semaphore == null) { + throw new RetryableException(retryBackoffDelayIterator); + } + } catch (InterruptedException e) { + throw new CompletionException(e); + } + + CompletableFuture future; + try { + future = futureSupplier.get(); + } catch (Exception ex) { + // Exception in future creation. Can't retry this. + semaphore.release(); + throw new RuntimeException(ex); + } + + return future.handle((resp, t) -> { + try { + if (t != null) { + Throwable ex = ExceptionsHelper.unwrap(t, SdkException.class); + if (ex != null) { + throw new RetryableException(retryBackoffDelayIterator, t.getMessage(), t); + } + throw new CompletionException(t); + } + return resp; + } finally { + semaphore.release(); + } + }); + }; + } + +} diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 4173f8b66387f..fee00fd9e6ec7 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -47,7 +47,9 @@ import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; +import org.opensearch.repositories.s3.async.PermitBackedRetryableFutureUtils; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.Scheduler; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -65,6 +67,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -91,6 +94,8 @@ public class S3BlobContainerMockClientTests extends OpenSearchTestCase implement private MockS3AsyncService asyncService; private ExecutorService futureCompletionService; private ExecutorService streamReaderService; + private ExecutorService remoteTransferRetry; + private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; private S3BlobContainer blobContainer; @@ -364,6 +369,8 @@ public void setUp() throws Exception { asyncService = new MockS3AsyncService(configPath(), 1000); futureCompletionService = Executors.newSingleThreadExecutor(); streamReaderService = Executors.newSingleThreadExecutor(); + remoteTransferRetry = Executors.newFixedThreadPool(20); + scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1); transferNIOGroup = new AsyncTransferEventLoopGroup(1); blobContainer = createBlobContainer(); super.setUp(); @@ -373,6 +380,11 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { IOUtils.close(asyncService); + futureCompletionService.shutdown(); + streamReaderService.shutdown(); + remoteTransferRetry.shutdown(); + scheduler.shutdown(); + transferNIOGroup.close(); super.tearDown(); } @@ -410,7 +422,14 @@ private S3BlobStore createBlobStore() { S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), - asyncExecutorContainer.getStreamReader() + asyncExecutorContainer.getStreamReader(), + new PermitBackedRetryableFutureUtils<>( + 3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), + 0.7, + remoteTransferRetry, + scheduler + ) ), asyncExecutorContainer, asyncExecutorContainer, diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index 10578090da75c..14691c89c202a 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -67,6 +67,7 @@ import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; +import org.opensearch.repositories.s3.async.PermitBackedRetryableFutureUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -87,6 +88,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -114,6 +117,8 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes private S3AsyncService asyncService; private ExecutorService futureCompletionService; private ExecutorService streamReaderService; + private ExecutorService remoteTransferRetry; + private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; @Before @@ -125,6 +130,8 @@ public void setUp() throws Exception { futureCompletionService = Executors.newSingleThreadExecutor(); streamReaderService = Executors.newSingleThreadExecutor(); transferNIOGroup = new AsyncTransferEventLoopGroup(1); + remoteTransferRetry = Executors.newFixedThreadPool(20); + scheduler = new ScheduledThreadPoolExecutor(1); // needed by S3AsyncService SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", configPath().toString())); @@ -137,6 +144,8 @@ public void tearDown() throws Exception { streamReaderService.shutdown(); futureCompletionService.shutdown(); + remoteTransferRetry.shutdown(); + scheduler.shutdown(); IOUtils.close(transferNIOGroup); if (previousOpenSearchPathConf != null) { @@ -223,7 +232,14 @@ protected AsyncMultiStreamBlobContainer createBlobContainer( S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader(), - asyncExecutorContainer.getStreamReader() + asyncExecutorContainer.getStreamReader(), + new PermitBackedRetryableFutureUtils<>( + 3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), + 0.7, + remoteTransferRetry, + scheduler + ) ), asyncExecutorContainer, asyncExecutorContainer, diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index 04d1819bef02b..e967e2b023465 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -67,7 +67,14 @@ public void setUp() throws Exception { ByteSizeUnit.MB.toBytes(5), Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor(), - Executors.newSingleThreadExecutor() + Executors.newSingleThreadExecutor(), + new PermitBackedRetryableFutureUtils<>( + 3, + Math.max(Runtime.getRuntime().availableProcessors() * 5, 10), + 0.7, + Executors.newSingleThreadExecutor(), + Executors.newSingleThreadScheduledExecutor() + ) ); super.setUp(); } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtilsTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtilsTests.java new file mode 100644 index 0000000000000..b07bf0630f969 --- /dev/null +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtilsTests.java @@ -0,0 +1,505 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import software.amazon.awssdk.core.exception.ApiCallAttemptTimeoutException; +import software.amazon.awssdk.utils.CompletableFutureUtils; + +import org.opensearch.common.blobstore.stream.write.WritePriority; +import org.opensearch.repositories.s3.SocketAccess; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +@SuppressWarnings({ "unchecked", "rawtypes" }) +public class PermitBackedRetryableFutureUtilsTests extends OpenSearchTestCase { + private ExecutorService testExecutor; + private ScheduledExecutorService scheduler; + + @Before + public void setup() { + this.testExecutor = Executors.newFixedThreadPool(30); + this.scheduler = Executors.newSingleThreadScheduledExecutor(); + } + + @After + public void cleanUp() { + testExecutor.shutdown(); + scheduler.shutdown(); + } + + public void testFutureExecAndPermitRelease() throws InterruptedException, ExecutionException { + PermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new PermitBackedRetryableFutureUtils( + 3, + Runtime.getRuntime().availableProcessors(), + 0.7, + testExecutor, + scheduler + ); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + assertEquals((int) (0.7 * Runtime.getRuntime().availableProcessors()), maxHighPermits); + assertEquals( + Runtime.getRuntime().availableProcessors() - maxHighPermits, + permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits() + ); + + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + assertEquals(maxHighPermits - 1, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + return "success"; + }, testExecutor)); + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture( + supplier, + WritePriority.HIGH, + requestContext + ); + resultFuture.get(); + + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } + + static class TestPermitBackedRetryableFutureUtils extends PermitBackedRetryableFutureUtils { + + private final AtomicBoolean highPermitsFullyConsumed = new AtomicBoolean(); + private final AtomicBoolean lowPermitsFullyConsumed = new AtomicBoolean(); + private final AtomicBoolean lowPermitConsumedForHighPriority = new AtomicBoolean(); + private final AtomicBoolean highPermitsConsumedForLowPriority = new AtomicBoolean(); + private final AtomicBoolean waitedForPermit = new AtomicBoolean(); + private final Runnable onWaitForPermit; + + /** + * @param maxRetryAttempts max number of retries + * @param availablePermits Total available permits for transfer + * @param priorityPermitAllocation Allocation bandwidth of priority permits. Rest will be allocated to low + * priority permits. + */ + public TestPermitBackedRetryableFutureUtils( + int maxRetryAttempts, + int availablePermits, + double priorityPermitAllocation, + ExecutorService remoteTransferRetry, + ScheduledExecutorService scheduler + ) { + super(maxRetryAttempts, availablePermits, priorityPermitAllocation, remoteTransferRetry, scheduler); + this.onWaitForPermit = null; + + } + + static class TestSemaphore extends TypeSemaphore { + private final Runnable onWaitForPermit; + private final Supplier preAcquireFailure; + + public TestSemaphore(int permits, String type, Runnable onWaitForPermit, Supplier preAcquireFailure) { + super(permits, type); + this.onWaitForPermit = onWaitForPermit; + this.preAcquireFailure = preAcquireFailure; + } + + @Override + public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { + if (preAcquireFailure != null && preAcquireFailure.get()) { + return false; + } + onWaitForPermit.run(); + return super.tryAcquire(timeout, unit); + } + + @Override + public boolean tryAcquire() { + if (preAcquireFailure != null && preAcquireFailure.get()) { + return false; + } + return super.tryAcquire(); + } + } + + public TestPermitBackedRetryableFutureUtils( + int maxRetryAttempts, + int availablePermits, + double priorityPermitAllocation, + Runnable onWaitForPermit, + Supplier preAcquireFailure, + ExecutorService remoteTransferRetry, + ScheduledExecutorService scheduler + ) { + super(maxRetryAttempts, availablePermits, priorityPermitAllocation, remoteTransferRetry, scheduler); + this.onWaitForPermit = () -> { + waitedForPermit.set(true); + onWaitForPermit.run(); + }; + this.highPrioritySemaphore = new TestSemaphore( + highPrioritySemaphore.availablePermits(), + "high", + this.onWaitForPermit, + preAcquireFailure + ); + this.lowPrioritySemaphore = new TestSemaphore( + lowPrioritySemaphore.availablePermits(), + "low", + this.onWaitForPermit, + preAcquireFailure + ); + } + + Semaphore acquirePermit(WritePriority writePriority, boolean isLowPriorityPermitsConsumable) throws InterruptedException { + TypeSemaphore semaphore = (TypeSemaphore) super.acquirePermit(writePriority, isLowPriorityPermitsConsumable); + if (semaphore == null) { + return null; + } + if (semaphore.getType().equals("high")) { + if (getAvailableHighPriorityPermits() == 0) { + highPermitsFullyConsumed.set(true); + } + if (writePriority == WritePriority.LOW) { + highPermitsConsumedForLowPriority.set(true); + } + } else if (semaphore.getType().equals("low")) { + if (getAvailableLowPriorityPermits() == 0) { + lowPermitsFullyConsumed.set(true); + } + if (writePriority == WritePriority.HIGH) { + lowPermitConsumedForHighPriority.set(true); + } + } + return semaphore; + } + } + + public void testLowPermitConsumptionForHighTask() throws InterruptedException, ExecutionException { + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new TestPermitBackedRetryableFutureUtils( + 3, + Runtime.getRuntime().availableProcessors(), + 0.7, + testExecutor, + scheduler + ); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + List> futures = new ArrayList<>(); + CountDownLatch delayedLatch = new CountDownLatch(1); + for (int reqIdx = 0; reqIdx < (maxHighPermits + lowPermits); reqIdx++) { + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + try { + // Keep the permit acquired + delayedLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "success"; + }, testExecutor)); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture( + supplier, + WritePriority.HIGH, + requestContext + ); + futures.add(resultFuture); + } + // Now release all permits + delayedLatch.countDown(); + + CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(CompletableFuture[]::new)).get(); + assertTrue(permitBasedRetryableFutureUtils.lowPermitsFullyConsumed.get()); + assertTrue(permitBasedRetryableFutureUtils.highPermitsFullyConsumed.get()); + assertTrue(permitBasedRetryableFutureUtils.lowPermitConsumedForHighPriority.get()); + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } + + public void testOnlyHighPermitsAcquiredWhenLowTaskInProgress() throws ExecutionException, InterruptedException { + CountDownLatch delayedLatch = new CountDownLatch(1); + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new TestPermitBackedRetryableFutureUtils( + 3, + Runtime.getRuntime().availableProcessors(), + 0.7, + delayedLatch::countDown, + null, + testExecutor, + scheduler + ); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext lowRequestContext = permitBasedRetryableFutureUtils.createRequestContext(); + List> futures = new ArrayList<>(); + + Supplier> lowSupplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + try { + // Keep the permit acquired + delayedLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "success"; + }, testExecutor)); + CompletableFuture lowResultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture( + lowSupplier, + WritePriority.LOW, + lowRequestContext + ); + futures.add(lowResultFuture); + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + for (int reqIdx = 0; reqIdx < maxHighPermits; reqIdx++) { + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + try { + // Keep the permit acquired + delayedLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "success"; + }, testExecutor)); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture( + supplier, + WritePriority.HIGH, + requestContext + ); + futures.add(resultFuture); + } + + Thread t = new Thread(() -> { + Supplier> supplier = () -> SocketAccess.doPrivileged( + () -> CompletableFuture.supplyAsync(() -> "success", testExecutor) + ); + + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture( + supplier, + WritePriority.HIGH, + requestContext + ); + futures.add(resultFuture); + }); + t.start(); + t.join(); + + CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(CompletableFuture[]::new)).get(); + assertTrue(permitBasedRetryableFutureUtils.waitedForPermit.get()); + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } + + public void testHighPermitsConsumedForLowTasks() throws ExecutionException, InterruptedException { + CountDownLatch delayedLatch = new CountDownLatch(1); + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new TestPermitBackedRetryableFutureUtils( + 3, + Runtime.getRuntime().availableProcessors(), + 0.7, + delayedLatch::countDown, + null, + testExecutor, + scheduler + ); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + List> futures = new ArrayList<>(); + for (int reqIdx = 0; reqIdx < lowPermits; reqIdx++) { + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + try { + // Keep the permit acquired + delayedLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "success"; + }, testExecutor)); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture( + supplier, + WritePriority.LOW, + requestContext + ); + futures.add(resultFuture); + } + + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + try { + // Keep the permit acquired + delayedLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return "success"; + }, testExecutor)); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture( + supplier, + WritePriority.LOW, + requestContext + ); + futures.add(resultFuture); + delayedLatch.countDown(); + + CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(CompletableFuture[]::new)).get(); + assertTrue(permitBasedRetryableFutureUtils.highPermitsConsumedForLowPriority.get()); + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } + + public void testFutureRetryOnSemaphoreFailure() throws ExecutionException, InterruptedException { + int retryCount = 3; + // Multiply by 3 as there are 3 ways in which permit can be acquired. + AtomicInteger failureCount = new AtomicInteger((retryCount - 1) * 3); + AtomicBoolean exhaustRetries = new AtomicBoolean(); + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new TestPermitBackedRetryableFutureUtils( + retryCount, + Runtime.getRuntime().availableProcessors(), + 0.7, + null, + () -> { + if (failureCount.get() > 0 || exhaustRetries.get()) { + failureCount.decrementAndGet(); + return true; + } + return false; + }, + testExecutor, + scheduler + ); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + List> futures = new ArrayList<>(); + Supplier> supplier = () -> SocketAccess.doPrivileged( + () -> CompletableFuture.supplyAsync(() -> "success", testExecutor) + ); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture( + supplier, + WritePriority.HIGH, + requestContext + ); + futures.add(resultFuture); + + CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(CompletableFuture[]::new)).get(); + + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + // Reached here so future executed successfully after retries. + + exhaustRetries.set(true); + resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.HIGH, requestContext); + resultFuture.whenComplete((r, t) -> { + assertNotNull(t); + assertTrue(t instanceof PermitBackedRetryableFutureUtils.RetryableException); + }); + CompletableFuture finalResultFuture = resultFuture; + assertThrows(Exception.class, finalResultFuture::get); + } + + public void testFutureRetryOnExecFailure() throws ExecutionException, InterruptedException { + int retryCount = 3; + AtomicInteger failureCount = new AtomicInteger(retryCount); + AtomicBoolean exhaustRetries = new AtomicBoolean(); + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new TestPermitBackedRetryableFutureUtils( + retryCount, + Runtime.getRuntime().availableProcessors(), + 0.7, + null, + null, + testExecutor, + scheduler + ); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + List> futures = new ArrayList<>(); + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + if (failureCount.get() > 0 || exhaustRetries.get()) { + failureCount.decrementAndGet(); + throw ApiCallAttemptTimeoutException.builder().build(); + } + return "success"; + }, testExecutor)); + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture( + supplier, + WritePriority.HIGH, + requestContext + ); + futures.add(resultFuture); + + CompletableFutureUtils.allOfExceptionForwarded(futures.toArray(CompletableFuture[]::new)).get(); + + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + // Reached here so future executed successfully after retries. + + exhaustRetries.set(true); + resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture(supplier, WritePriority.HIGH, requestContext); + resultFuture.whenComplete((r, t) -> { + assertNotNull(t); + assertTrue(t instanceof PermitBackedRetryableFutureUtils.RetryableException); + }); + CompletableFuture finalResultFuture = resultFuture; + assertThrows(Exception.class, finalResultFuture::get); + + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } + + public void testNonRetryableFailure() throws ExecutionException, InterruptedException { + // Throw only once to ensure no retries for unknown exception. + AtomicInteger failureCount = new AtomicInteger(1); + AtomicBoolean exhaustRetries = new AtomicBoolean(); + TestPermitBackedRetryableFutureUtils permitBasedRetryableFutureUtils = new TestPermitBackedRetryableFutureUtils( + 3, + Runtime.getRuntime().availableProcessors(), + 0.7, + null, + null, + testExecutor, + scheduler + ); + int maxHighPermits = permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits(); + int lowPermits = Runtime.getRuntime().availableProcessors() - maxHighPermits; + + PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBasedRetryableFutureUtils.createRequestContext(); + Supplier> supplier = () -> SocketAccess.doPrivileged(() -> CompletableFuture.supplyAsync(() -> { + if (failureCount.get() > 0) { + failureCount.decrementAndGet(); + throw new RuntimeException("Generic exception"); + } + return "success"; + }, testExecutor)); + + CompletableFuture resultFuture = permitBasedRetryableFutureUtils.createPermitBackedRetryableFuture( + supplier, + WritePriority.HIGH, + requestContext + ); + resultFuture.whenComplete((r, t) -> { + assertNotNull(t); + assertFalse(t instanceof PermitBackedRetryableFutureUtils.RetryableException); + }); + assertThrows(Exception.class, resultFuture::get); + + assertEquals(maxHighPermits, permitBasedRetryableFutureUtils.getAvailableHighPriorityPermits()); + assertEquals(lowPermits, permitBasedRetryableFutureUtils.getAvailableLowPriorityPermits()); + } +} diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java index 3f341c878c3c7..9888612b444bc 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java @@ -16,5 +16,6 @@ public enum WritePriority { NORMAL, HIGH, - URGENT + URGENT, + LOW } diff --git a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java index 351aec6e3af6c..bfb841307af49 100644 --- a/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java +++ b/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java @@ -437,10 +437,14 @@ private void uploadNewSegments( batchUploadListener.onFailure(ex); }); statsListener.beforeUpload(src); - remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener); + remoteDirectory.copyFrom(storeDirectory, src, IOContext.DEFAULT, aggregatedListener, isLowPriorityUpload()); } } + private boolean isLowPriorityUpload() { + return isLocalOrSnapshotRecovery(); + } + /** * Whether to upload a file or not depending on whether file is in excluded list or has been already uploaded. * diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 345583bbbd1be..4fef8c6179c8e 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -323,11 +323,12 @@ public boolean copyFrom( String remoteFileName, IOContext context, Runnable postUploadRunner, - ActionListener listener + ActionListener listener, + boolean lowPriorityUpload ) { if (blobContainer instanceof AsyncMultiStreamBlobContainer) { try { - uploadBlob(from, src, remoteFileName, context, postUploadRunner, listener); + uploadBlob(from, src, remoteFileName, context, postUploadRunner, listener, lowPriorityUpload); } catch (Exception e) { listener.onFailure(e); } @@ -342,7 +343,8 @@ private void uploadBlob( String remoteFileName, IOContext ioContext, Runnable postUploadRunner, - ActionListener listener + ActionListener listener, + boolean lowPriorityUpload ) throws Exception { long expectedChecksum = calculateChecksumOfChecksum(from, src); long contentLength; @@ -358,7 +360,7 @@ private void uploadBlob( remoteFileName, contentLength, true, - WritePriority.NORMAL, + lowPriorityUpload ? WritePriority.LOW : WritePriority.NORMAL, (size, position) -> uploadRateLimiter.apply(new OffsetRangeIndexInputStream(from.openInput(src, ioContext), size, position)), expectedChecksum, remoteIntegrityEnabled diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index ec1163fe91b6c..8c0ecb4cc783a 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -453,7 +453,7 @@ public IndexInput openInput(String name, IOContext context) throws IOException { * @param context IOContext to be used to open IndexInput of file during remote upload * @param listener Listener to handle upload callback events */ - public void copyFrom(Directory from, String src, IOContext context, ActionListener listener) { + public void copyFrom(Directory from, String src, IOContext context, ActionListener listener, boolean lowPriorityUpload) { try { final String remoteFileName = getNewRemoteSegmentFilename(src); boolean uploaded = remoteDataDirectory.copyFrom(from, src, remoteFileName, context, () -> { @@ -462,7 +462,7 @@ public void copyFrom(Directory from, String src, IOContext context, ActionListen } catch (IOException e) { throw new RuntimeException("Exception in segment postUpload for file " + src, e); } - }, listener); + }, listener, lowPriorityUpload); if (uploaded == false) { copyFrom(from, src, src, context); listener.onResponse(null); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java index 9e38e1749d434..ee81369725e6f 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteDirectoryTests.java @@ -104,7 +104,8 @@ public void onResponse(Void t) { public void onFailure(Exception e) { fail("Listener responded with exception" + e); } - } + }, + false ); assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); assertTrue(postUploadInvoked.get()); @@ -141,7 +142,8 @@ public void onResponse(Void t) { public void onFailure(Exception e) { countDownLatch.countDown(); } - } + }, + false ); assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); assertFalse(postUploadInvoked.get()); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index b1e2028d761f0..567199cf64cd8 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -639,7 +639,7 @@ public void onResponse(Void unused) { @Override public void onFailure(Exception e) {} }; - remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener, false); assertTrue(latch.await(5000, TimeUnit.SECONDS)); assertTrue(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename)); storeDirectory.close(); @@ -683,7 +683,7 @@ public void onFailure(Exception e) { latch.countDown(); } }; - remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener); + remoteSegmentStoreDirectory.copyFrom(storeDirectory, filename, IOContext.DEFAULT, completionListener, false); assertTrue(latch.await(5000, TimeUnit.SECONDS)); assertFalse(remoteSegmentStoreDirectory.getSegmentsUploadedToRemoteStore().containsKey(filename));