diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index da2c6e8c1b0ee..b001ac878f5df 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -249,7 +249,21 @@ protected S3Repository createRepository( ClusterService clusterService, RecoverySettings recoverySettings ) { - return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) { + return new S3Repository( + metadata, + registry, + service, + clusterService, + recoverySettings, + null, + null, + null, + null, + null, + false, + null, + null + ) { @Override public BlobStore blobStore() { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 25f361b40636e..8424e2ee7fecc 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -88,6 +88,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import org.opensearch.repositories.s3.async.UploadRequest; import org.opensearch.repositories.s3.utils.HttpRangeUtils; @@ -193,32 +194,6 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp blobStore.isUploadRetryEnabled() ); try { - if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) { - StreamContext streamContext = SocketAccess.doPrivileged( - () -> writeContext.getStreamProvider(uploadRequest.getContentLength()) - ); - InputStreamContainer inputStream = streamContext.provideStream(0); - try { - executeMultipartUpload( - blobStore, - uploadRequest.getKey(), - inputStream.getInputStream(), - uploadRequest.getContentLength() - ); - completionListener.onResponse(null); - } catch (Exception ex) { - logger.error( - () -> new ParameterizedMessage( - "Failed to upload large file {} of size {} ", - uploadRequest.getKey(), - uploadRequest.getContentLength() - ), - ex - ); - completionListener.onFailure(ex); - } - return; - } long partSize = blobStore.getAsyncTransferManager() .calculateOptimalPartSize(writeContext.getFileSize(), writeContext.getWritePriority(), blobStore.isUploadRetryEnabled()); StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize)); @@ -232,16 +207,28 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } else { s3AsyncClient = amazonS3Reference.get().client(); } - CompletableFuture completableFuture = blobStore.getAsyncTransferManager() - .uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher()); - completableFuture.whenComplete((response, throwable) -> { - if (throwable == null) { - completionListener.onResponse(response); + + if (writeContext.getWritePriority() != WritePriority.URGENT && writeContext.getWritePriority() != WritePriority.HIGH) { + if (writeContext.getWritePriority() == WritePriority.LOW) { + blobStore.getLowPrioritySizeBasedBlockingQ() + .produce( + new SizeBasedBlockingQ.Item( + writeContext.getFileSize(), + () -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener) + ) + ); } else { - Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable; - completionListener.onFailure(ex); + blobStore.getOtherPrioritySizeBasedBlockingQ() + .produce( + new SizeBasedBlockingQ.Item( + writeContext.getFileSize(), + () -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener) + ) + ); } - }); + } else { + createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener); + } } } catch (Exception e) { logger.info("exception error from blob container for file {}", writeContext.getFileName()); @@ -249,6 +236,24 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp } } + private CompletableFuture createFileCompletableFuture( + S3AsyncClient s3AsyncClient, + UploadRequest uploadRequest, + StreamContext streamContext, + ActionListener completionListener + ) { + CompletableFuture completableFuture = blobStore.getAsyncTransferManager() + .uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher()); + return completableFuture.whenComplete((response, throwable) -> { + if (throwable == null) { + completionListener.onResponse(response); + } else { + Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable; + completionListener.onFailure(ex); + } + }); + } + @ExperimentalApi @Override public void readBlobAsync(String blobName, ActionListener listener) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index fc70fbb0db00e..e1a8fff87bc3e 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -45,6 +45,7 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferManager; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import java.io.IOException; import java.util.Collections; @@ -94,6 +95,8 @@ class S3BlobStore implements BlobStore { private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; private final boolean multipartUploadEnabled; + private final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ; + private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; S3BlobStore( S3Service service, @@ -109,7 +112,9 @@ class S3BlobStore implements BlobStore { AsyncTransferManager asyncTransferManager, AsyncExecutorContainer urgentExecutorBuilder, AsyncExecutorContainer priorityExecutorBuilder, - AsyncExecutorContainer normalExecutorBuilder + AsyncExecutorContainer normalExecutorBuilder, + SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ, + SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ ) { this.service = service; this.s3AsyncService = s3AsyncService; @@ -128,6 +133,8 @@ class S3BlobStore implements BlobStore { // Settings to initialize blobstore with. this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings()); this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings()); + this.otherPrioritySizeBasedBlockingQ = otherPrioritySizeBasedBlockingQ; + this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; } @Override @@ -184,6 +191,14 @@ public int getBulkDeletesSize() { return bulkDeletesSize; } + public SizeBasedBlockingQ getOtherPrioritySizeBasedBlockingQ() { + return otherPrioritySizeBasedBlockingQ; + } + + public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() { + return lowPrioritySizeBasedBlockingQ; + } + @Override public BlobContainer blobContainer(BlobPath path) { return new S3BlobContainer(path, this); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index f8481e3a9402c..e486b1ee8b944 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -49,6 +49,7 @@ import org.opensearch.common.settings.SecureSetting; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.Strings; import org.opensearch.core.common.settings.SecureString; @@ -63,6 +64,7 @@ import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository; import org.opensearch.repositories.s3.async.AsyncExecutorContainer; import org.opensearch.repositories.s3.async.AsyncTransferManager; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import org.opensearch.snapshots.SnapshotId; import org.opensearch.snapshots.SnapshotInfo; import org.opensearch.threadpool.Scheduler; @@ -210,6 +212,16 @@ class S3Repository extends MeteredBlobStoreRepository { Setting.Property.NodeScope ); + /** + * Number of transfer queue consumers + */ + public static Setting S3_TRANSFER_QUEUE_CONSUMERS = new Setting<>( + "s3_transfer_queue_consumers", + (s) -> Integer.toString(Math.max(5, OpenSearchExecutors.allocatedProcessors(s))), + (s) -> Setting.parseInt(s, 5, "s3_transfer_queue_consumers"), + Setting.Property.NodeScope + ); + /** * Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g. */ @@ -268,6 +280,8 @@ class S3Repository extends MeteredBlobStoreRepository { private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; private final Path pluginConfigPath; + private final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ; + private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; private volatile int bulkDeletesSize; @@ -283,7 +297,9 @@ class S3Repository extends MeteredBlobStoreRepository { final AsyncExecutorContainer priorityExecutorBuilder, final AsyncExecutorContainer normalExecutorBuilder, final S3AsyncService s3AsyncService, - final boolean multipartUploadEnabled + final boolean multipartUploadEnabled, + final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ, + final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ ) { this( metadata, @@ -297,7 +313,9 @@ class S3Repository extends MeteredBlobStoreRepository { normalExecutorBuilder, s3AsyncService, multipartUploadEnabled, - Path.of("") + Path.of(""), + otherPrioritySizeBasedBlockingQ, + lowPrioritySizeBasedBlockingQ ); } @@ -316,7 +334,9 @@ class S3Repository extends MeteredBlobStoreRepository { final AsyncExecutorContainer normalExecutorBuilder, final S3AsyncService s3AsyncService, final boolean multipartUploadEnabled, - Path pluginConfigPath + Path pluginConfigPath, + final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ, + final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ ) { super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata)); this.service = service; @@ -327,6 +347,8 @@ class S3Repository extends MeteredBlobStoreRepository { this.urgentExecutorBuilder = urgentExecutorBuilder; this.priorityExecutorBuilder = priorityExecutorBuilder; this.normalExecutorBuilder = normalExecutorBuilder; + this.otherPrioritySizeBasedBlockingQ = otherPrioritySizeBasedBlockingQ; + this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; validateRepositoryMetadata(metadata); readRepositoryMetadata(); @@ -389,7 +411,9 @@ protected S3BlobStore createBlobStore() { asyncUploadUtils, urgentExecutorBuilder, priorityExecutorBuilder, - normalExecutorBuilder + normalExecutorBuilder, + otherPrioritySizeBasedBlockingQ, + lowPrioritySizeBasedBlockingQ ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index 63872f0b98cd8..84b7134321274 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -41,6 +41,9 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.common.util.CollectionUtils; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; @@ -54,6 +57,7 @@ import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; import org.opensearch.repositories.s3.async.PermitBackedRetryableFutureUtils; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import org.opensearch.script.ScriptService; import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.FixedExecutorBuilder; @@ -86,6 +90,8 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private static final String FUTURE_COMPLETION = "future_completion"; private static final String REMOTE_TRANSFER_RETRY = "remote_transfer_retry"; private static final String STREAM_READER = "stream_reader"; + private static final String LOW_TRANSFER_QUEUE_CONSUMER = "low_transfer_queue_consumer"; + private static final String OTHER_TRANSFER_QUEUE_CONSUMER = "other_transfer_queue_consumer"; protected final S3Service service; private final S3AsyncService s3AsyncService; @@ -96,7 +102,11 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private AsyncExecutorContainer priorityExecutorBuilder; private AsyncExecutorContainer normalExecutorBuilder; private ExecutorService remoteTransferRetryPool; + private ExecutorService lowTransferQConsumerService; + private ExecutorService otherTransferQConsumerService; private ScheduledExecutorService scheduler; + private SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ; + private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; public S3RepositoryPlugin(final Settings settings, final Path configPath) { this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath)); @@ -134,9 +144,36 @@ public List> getExecutorBuilders(Settings settings) { TimeValue.timeValueMinutes(5) ) ); + executorBuilders.add( + new FixedExecutorBuilder( + settings, + LOW_TRANSFER_QUEUE_CONSUMER, + lowPriorityTransferQConsumers(settings), + 10, + "thread_pool." + LOW_TRANSFER_QUEUE_CONSUMER + ) + ); + executorBuilders.add( + new FixedExecutorBuilder( + settings, + OTHER_TRANSFER_QUEUE_CONSUMER, + otherPriorityTransferQConsumers(settings), + 10, + "thread_pool." + OTHER_TRANSFER_QUEUE_CONSUMER + ) + ); return executorBuilders; } + private int lowPriorityTransferQConsumers(Settings settings) { + double lowPriorityAllocation = ((double) (100 - S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(settings))) / 100; + return Math.max(2, (int) (lowPriorityAllocation * S3Repository.S3_TRANSFER_QUEUE_CONSUMERS.get(settings))); + } + + private int otherPriorityTransferQConsumers(Settings settings) { + return S3Repository.S3_TRANSFER_QUEUE_CONSUMERS.get(settings); + } + static int halfNumberOfProcessors(int numberOfProcessors) { return (numberOfProcessors + 1) / 2; } @@ -204,8 +241,32 @@ public Collection createComponents( new AsyncTransferEventLoopGroup(normalEventLoopThreads) ); this.remoteTransferRetryPool = threadPool.executor(REMOTE_TRANSFER_RETRY); + this.lowTransferQConsumerService = threadPool.executor(LOW_TRANSFER_QUEUE_CONSUMER); + this.otherTransferQConsumerService = threadPool.executor(OTHER_TRANSFER_QUEUE_CONSUMER); this.scheduler = threadPool.scheduler(); - return Collections.emptyList(); + int otherPriorityConsumers = otherPriorityTransferQConsumers(clusterService.getSettings()); + this.otherPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(otherPriorityConsumers * 10L, ByteSizeUnit.GB), + otherTransferQConsumerService, + otherPriorityConsumers + ); + int lowPriorityConsumers = lowPriorityTransferQConsumers(clusterService.getSettings()); + LowPrioritySizeBasedBlockingQ lowPrioritySizeBasedBlockingQ = new LowPrioritySizeBasedBlockingQ( + new ByteSizeValue(lowPriorityConsumers * 10L, ByteSizeUnit.GB), + lowTransferQConsumerService, + lowPriorityConsumers + ); + this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ; + + return CollectionUtils.arrayAsArrayList(this.otherPrioritySizeBasedBlockingQ, lowPrioritySizeBasedBlockingQ); + } + + // New class because in core, components are injected via guice only by instance creation due to which + // same binding types fail. + private static final class LowPrioritySizeBasedBlockingQ extends SizeBasedBlockingQ { + public LowPrioritySizeBasedBlockingQ(ByteSizeValue capacity, ExecutorService executorService, int consumers) { + super(capacity, executorService, consumers); + } } // proxy method for testing @@ -223,13 +284,12 @@ protected S3Repository createRepository( urgentExecutorBuilder.getStreamReader(), new PermitBackedRetryableFutureUtils<>( S3Repository.S3_MAX_TRANSFER_RETRIES.get(clusterService.getSettings()), - // High permit allocation because each op acquiring permit performs disk IO, computation and network IO. - Math.max(allocatedProcessors(clusterService.getSettings()) * 5, 10), + // High number of permit allocation because each op acquiring permit performs disk IO, computation and network IO. + Math.max(allocatedProcessors(clusterService.getSettings()) * 4, 10), ((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings())) / 100, remoteTransferRetryPool, scheduler ) - ); return new S3Repository( metadata, @@ -243,7 +303,9 @@ protected S3Repository createRepository( normalExecutorBuilder, s3AsyncService, S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING.get(clusterService.getSettings()), - configPath + configPath, + otherPrioritySizeBasedBlockingQ, + lowPrioritySizeBasedBlockingQ ); } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java index f3955c9d69186..24bcf40602449 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java @@ -42,7 +42,7 @@ */ public class AsyncPartsHandler { - private static Logger log = LogManager.getLogger(AsyncPartsHandler.class); + private static final Logger log = LogManager.getLogger(AsyncPartsHandler.class); /** * Uploads parts of the upload multipart request* @@ -72,7 +72,7 @@ public static List> uploadParts( StatsMetricPublisher statsMetricPublisher, boolean uploadRetryEnabled, PermitBackedRetryableFutureUtils permitBackedRetryableFutureUtils - ) throws InterruptedException { + ) { List> futures = new ArrayList<>(); PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBackedRetryableFutureUtils.createRequestContext(); for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) { @@ -109,12 +109,17 @@ public static List> uploadParts( ); }; - CompletableFuture retryableFuture = permitBackedRetryableFutureUtils.createPermitBackedRetryableFuture( - partFutureSupplier, - uploadRequest.getWritePriority(), - requestContext - ); - futures.add(retryableFuture); + CompletableFuture partFuture; + if (uploadRequest.getWritePriority() == WritePriority.HIGH || uploadRequest.getWritePriority() == WritePriority.URGENT) { + partFuture = partFutureSupplier.get(); + } else { + partFuture = permitBackedRetryableFutureUtils.createPermitBackedRetryableFuture( + partFutureSupplier, + uploadRequest.getWritePriority(), + requestContext + ); + } + futures.add(partFuture); } return futures; diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index 4be3cb1a84e20..733c240f77514 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -400,11 +400,16 @@ private void uploadInOneChunk( }); PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBackedRetryableFutureUtils.createRequestContext(); - CompletableFuture putObjectFuture = permitBackedRetryableFutureUtils.createPermitBackedRetryableFuture( - putObjectFutureSupplier, - uploadRequest.getWritePriority(), - requestContext - ); + CompletableFuture putObjectFuture; + if (uploadRequest.getWritePriority() == WritePriority.HIGH || uploadRequest.getWritePriority() == WritePriority.URGENT) { + putObjectFuture = putObjectFutureSupplier.get(); + } else { + putObjectFuture = permitBackedRetryableFutureUtils.createPermitBackedRetryableFuture( + putObjectFutureSupplier, + uploadRequest.getWritePriority(), + requestContext + ); + } CompletableFutureUtils.forwardExceptionTo(returnFuture, putObjectFuture); CompletableFutureUtils.forwardResultTo(putObjectFuture, returnFuture); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtils.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtils.java index 25a45fb28c23f..e14db16030389 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtils.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/PermitBackedRetryableFutureUtils.java @@ -24,7 +24,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.function.Supplier; @@ -44,7 +43,6 @@ public class PermitBackedRetryableFutureUtils { private final int highPriorityPermits; private final int maxRetryAttempts; private static final int RETRY_BASE_INTERVAL_MILLIS = 1_000; - private final AtomicBoolean lowPriorityTransferProgress; private final ExecutorService remoteTransferRetryPool; private final ScheduledExecutorService scheduler; @@ -68,7 +66,6 @@ public PermitBackedRetryableFutureUtils( this.highPrioritySemaphore = new TypeSemaphore(highPriorityPermits, "high"); this.lowPriorityPermits = availablePermits - highPriorityPermits; this.lowPrioritySemaphore = new TypeSemaphore(lowPriorityPermits, "low"); - this.lowPriorityTransferProgress = new AtomicBoolean(); this.remoteTransferRetryPool = remoteTransferRetryPool; this.scheduler = scheduler; } @@ -140,15 +137,19 @@ public RequestContext createRequestContext() { */ static class RetryableException extends CompletionException { private final Iterator retryBackoffDelayIterator; + private final Semaphore semaphore; - public RetryableException(Iterator retryBackoffDelayIterator, String message, Throwable cause) { + public RetryableException(Iterator retryBackoffDelayIterator, String message, Throwable cause, Semaphore semaphore) { super(message, cause); this.retryBackoffDelayIterator = retryBackoffDelayIterator; + this.semaphore = semaphore; } public RetryableException(Iterator retryBackoffDelayIterator) { this.retryBackoffDelayIterator = retryBackoffDelayIterator; + this.semaphore = null; } + } /** @@ -205,7 +206,7 @@ public CompletableFuture createPermitBackedRetryableFuture( TimeValue.timeValueMillis(RETRY_BASE_INTERVAL_MILLIS), maxRetryAttempts ).iterator(); - Supplier> permitBackedFutureSupplier = createPermitBackedFutureSupplier( + Function> permitBackedFutureSupplier = createPermitBackedFutureSupplier( retryBackoffDelayIterator, requestContext.lowPriorityPermitsConsumable, futureSupplier, @@ -214,7 +215,7 @@ public CompletableFuture createPermitBackedRetryableFuture( CompletableFuture permitBackedFuture; try { - permitBackedFuture = permitBackedFutureSupplier.get(); + permitBackedFuture = permitBackedFutureSupplier.apply(null); } catch (RetryableException re) { // We need special handling when an exception occurs during first future creation itself. permitBackedFuture = retry(re, permitBackedFutureSupplier, retryBackoffDelayIterator); @@ -222,13 +223,13 @@ public CompletableFuture createPermitBackedRetryableFuture( return CompletableFuture.failedFuture(ex); } - return flatten( + return unwrap( permitBackedFuture.thenApply(CompletableFuture::completedFuture) .exceptionally(t -> retry(t, permitBackedFutureSupplier, retryBackoffDelayIterator)) ); } - private static CompletableFuture flatten( + private static CompletableFuture unwrap( CompletableFuture> completableCompletable ) { return completableCompletable.thenCompose(Function.identity()); @@ -236,7 +237,7 @@ private static CompletableFuture flatten( private CompletableFuture retry( Throwable ex, - Supplier> futureSupplier, + Function> futureSupplier, Iterator retryBackoffDelayIterator ) { if (!(ex instanceof RetryableException)) { @@ -248,10 +249,10 @@ private CompletableFuture retry( return CompletableFuture.failedFuture(ex); } - return flatten( - flatten( + return unwrap( + unwrap( CompletableFuture.supplyAsync( - futureSupplier, + () -> futureSupplier.apply(retryableException.semaphore), new DelayedExecutor( retryableException.retryBackoffDelayIterator.next().millis(), TimeUnit.MILLISECONDS, @@ -298,21 +299,22 @@ Semaphore acquirePermit(WritePriority writePriority, boolean isLowPriorityPermit return null; } - private Supplier> createPermitBackedFutureSupplier( + private Function> createPermitBackedFutureSupplier( Iterator retryBackoffDelayIterator, boolean lowPriorityPermitsConsumable, Supplier> futureSupplier, WritePriority writePriority ) { - return () -> { - Semaphore semaphore; - try { - semaphore = acquirePermit(writePriority, lowPriorityPermitsConsumable); - if (semaphore == null) { - throw new RetryableException(retryBackoffDelayIterator); + return (semaphore) -> { + if (semaphore == null) { + try { + semaphore = acquirePermit(writePriority, lowPriorityPermitsConsumable); + if (semaphore == null) { + throw new RetryableException(retryBackoffDelayIterator); + } + } catch (InterruptedException e) { + throw new CompletionException(e); } - } catch (InterruptedException e) { - throw new CompletionException(e); } CompletableFuture future; @@ -324,18 +326,26 @@ private Supplier> createPermitBackedFutureSupplier throw new RuntimeException(ex); } + Semaphore finalSemaphore = semaphore; return future.handle((resp, t) -> { try { if (t != null) { Throwable ex = ExceptionsHelper.unwrap(t, SdkException.class); if (ex != null) { - throw new RetryableException(retryBackoffDelayIterator, t.getMessage(), t); + throw new RetryableException(retryBackoffDelayIterator, t.getMessage(), t, finalSemaphore); } throw new CompletionException(t); } + finalSemaphore.release(); return resp; - } finally { - semaphore.release(); + } catch (RetryableException rex) { + if (!retryBackoffDelayIterator.hasNext()) { + finalSemaphore.release(); + } + throw rex; + } catch (Exception ex) { + finalSemaphore.release(); + throw ex; } }); }; diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java new file mode 100644 index 0000000000000..1c206b0a78e91 --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQ.java @@ -0,0 +1,180 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.lucene.store.AlreadyClosedException; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.core.common.unit.ByteSizeValue; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class SizeBasedBlockingQ extends AbstractLifecycleComponent { + private static final Logger log = LogManager.getLogger(SizeBasedBlockingQ.class); + + protected final LinkedBlockingQueue queue; + protected final Lock lock; + protected final Condition notFull; + protected final Condition notEmpty; + + protected final AtomicLong currentSize; + protected final ByteSizeValue capacity; + protected final AtomicBoolean closed; + protected final ExecutorService executorService; + protected final int consumers; + + public SizeBasedBlockingQ(ByteSizeValue capacity, ExecutorService executorService, int consumers) { + this.queue = new LinkedBlockingQueue<>(); + this.lock = new ReentrantLock(); + this.notFull = lock.newCondition(); + this.notEmpty = lock.newCondition(); + this.currentSize = new AtomicLong(); + this.capacity = capacity; + this.closed = new AtomicBoolean(); + this.executorService = executorService; + this.consumers = consumers; + } + + @Override + protected void doStart() { + for (int worker = 0; worker < consumers; worker++) { + Thread consumer = new Consumer(queue, currentSize, lock, notFull, notEmpty, closed); + executorService.submit(consumer); + } + } + + public void produce(Item item) throws InterruptedException { + if (item == null || item.size <= 0) { + throw new IllegalStateException("Invalid item input to produce."); + } + final Lock lock = this.lock; + final AtomicLong currentSize = this.currentSize; + lock.lock(); + try { + if (closed.get()) { + throw new AlreadyClosedException("Transfer queue is already closed."); + } + while (currentSize.get() + item.size >= capacity.getBytes()) { + notFull.await(); + } + if (closed.get()) { + throw new AlreadyClosedException("Transfer queue is already closed."); + } + queue.put(item); + currentSize.addAndGet(item.size); + notEmpty.signalAll(); + } finally { + lock.unlock(); + } + } + + public int getSize() { + return queue.size(); + } + + protected static class Consumer extends Thread { + private final LinkedBlockingQueue queue; + private final Lock lock; + private final Condition notFull; + private final Condition notEmpty; + private final AtomicLong currentSize; + private final AtomicBoolean closed; + + public Consumer( + LinkedBlockingQueue queue, + AtomicLong currentSize, + Lock lock, + Condition notFull, + Condition notEmpty, + AtomicBoolean closed + ) { + this.queue = queue; + this.lock = lock; + this.notEmpty = notEmpty; + this.notFull = notFull; + this.currentSize = currentSize; + this.closed = closed; + } + + @Override + public void run() { + while (true) { + try { + consume(); + } catch (AlreadyClosedException ex) { + return; + } catch (Exception ex) { + log.error("Failed to consume transfer event", ex); + } + } + } + + private void consume() throws InterruptedException { + final Lock lock = this.lock; + final AtomicLong currentSize = this.currentSize; + lock.lock(); + Item item; + try { + if (closed.get()) { + throw new AlreadyClosedException("transfer queue closed"); + } + while (currentSize.get() == 0) { + notEmpty.await(); + if (closed.get()) { + throw new AlreadyClosedException("transfer queue closed"); + } + } + + item = queue.take(); + currentSize.addAndGet(-item.size); + notFull.signalAll(); + } finally { + lock.unlock(); + } + + try { + item.consumable.run(); + } catch (Exception ex) { + log.error("Exception on executing item consumable", ex); + } + } + + } + + public static class Item { + private final long size; + private final Runnable consumable; + + public Item(long size, Runnable consumable) { + this.size = size; + this.consumable = consumable; + } + } + + @Override + protected void doStop() {} + + @Override + protected void doClose() { + lock.lock(); + closed.set(true); + try { + notEmpty.signalAll(); + } finally { + lock.unlock(); + } + } +} diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java index f84d953baae8e..16dbe05dcaee8 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java @@ -303,7 +303,21 @@ protected S3Repository createRepository( ClusterService clusterService, RecoverySettings recoverySettings ) { - return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) { + return new S3Repository( + metadata, + registry, + service, + clusterService, + recoverySettings, + null, + null, + null, + null, + null, + false, + null, + null + ) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 3bc9f70c0b603..c4071ba1ba05d 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -48,6 +48,7 @@ import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; import org.opensearch.repositories.s3.async.PermitBackedRetryableFutureUtils; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.Scheduler; import org.junit.After; @@ -92,9 +93,12 @@ public class S3BlobContainerMockClientTests extends OpenSearchTestCase implement private ExecutorService futureCompletionService; private ExecutorService streamReaderService; private ExecutorService remoteTransferRetry; + private ExecutorService transferQueueConsumerService; private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; private S3BlobContainer blobContainer; + private SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ; + private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; static class MockS3AsyncService extends S3AsyncService { @@ -367,8 +371,21 @@ public void setUp() throws Exception { futureCompletionService = Executors.newSingleThreadExecutor(); streamReaderService = Executors.newSingleThreadExecutor(); remoteTransferRetry = Executors.newFixedThreadPool(20); + transferQueueConsumerService = Executors.newFixedThreadPool(20); scheduler = new Scheduler.SafeScheduledThreadPoolExecutor(1); transferNIOGroup = new AsyncTransferEventLoopGroup(1); + otherPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB), + transferQueueConsumerService, + 10 + ); + lowPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB), + transferQueueConsumerService, + 5 + ); + otherPrioritySizeBasedBlockingQ.start(); + lowPrioritySizeBasedBlockingQ.start(); blobContainer = createBlobContainer(); super.setUp(); } @@ -380,6 +397,9 @@ public void tearDown() throws Exception { futureCompletionService.shutdown(); streamReaderService.shutdown(); remoteTransferRetry.shutdown(); + transferQueueConsumerService.shutdown(); + otherPrioritySizeBasedBlockingQ.close(); + lowPrioritySizeBasedBlockingQ.close(); scheduler.shutdown(); transferNIOGroup.close(); super.tearDown(); @@ -430,7 +450,9 @@ private S3BlobStore createBlobStore() { ), asyncExecutorContainer, asyncExecutorContainer, - asyncExecutorContainer + asyncExecutorContainer, + otherPrioritySizeBasedBlockingQ, + lowPrioritySizeBasedBlockingQ ); } @@ -578,114 +600,4 @@ private int calculateNumberOfParts(long contentLength, long partSize) { return (int) ((contentLength % partSize) == 0 ? contentLength / partSize : (contentLength / partSize) + 1); } - public void testFailureWhenLargeFileRedirected() throws IOException, ExecutionException, InterruptedException { - testLargeFilesRedirectedToSlowSyncClient(true); - } - - public void testLargeFileRedirected() throws IOException, ExecutionException, InterruptedException { - testLargeFilesRedirectedToSlowSyncClient(false); - } - - private void testLargeFilesRedirectedToSlowSyncClient(boolean expectException) throws IOException, InterruptedException { - final ByteSizeValue partSize = new ByteSizeValue(1024, ByteSizeUnit.MB); - - int numberOfParts = 20; - final long lastPartSize = new ByteSizeValue(20, ByteSizeUnit.MB).getBytes(); - final long blobSize = ((numberOfParts - 1) * partSize.getBytes()) + lastPartSize; - CountDownLatch countDownLatch = new CountDownLatch(1); - AtomicReference exceptionRef = new AtomicReference<>(); - ActionListener completionListener = ActionListener.wrap(resp -> { countDownLatch.countDown(); }, ex -> { - exceptionRef.set(ex); - countDownLatch.countDown(); - }); - - final String bucketName = randomAlphaOfLengthBetween(1, 10); - - final BlobPath blobPath = new BlobPath(); - if (randomBoolean()) { - IntStream.of(randomIntBetween(1, 5)).forEach(value -> blobPath.add("path_" + value)); - } - - final long bufferSize = ByteSizeUnit.MB.toBytes(randomIntBetween(5, 1024)); - - final S3BlobStore blobStore = mock(S3BlobStore.class); - when(blobStore.bucket()).thenReturn(bucketName); - when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); - when(blobStore.bufferSizeInBytes()).thenReturn(bufferSize); - - final boolean serverSideEncryption = randomBoolean(); - when(blobStore.serverSideEncryption()).thenReturn(serverSideEncryption); - - final StorageClass storageClass = randomFrom(StorageClass.values()); - when(blobStore.getStorageClass()).thenReturn(storageClass); - when(blobStore.isRedirectLargeUploads()).thenReturn(true); - - final ObjectCannedACL cannedAccessControlList = randomBoolean() ? randomFrom(ObjectCannedACL.values()) : null; - if (cannedAccessControlList != null) { - when(blobStore.getCannedACL()).thenReturn(cannedAccessControlList); - } - - final S3Client client = mock(S3Client.class); - final AmazonS3Reference clientReference = Mockito.spy(new AmazonS3Reference(client)); - doNothing().when(clientReference).close(); - when(blobStore.clientReference()).thenReturn(clientReference); - final CreateMultipartUploadResponse createMultipartUploadResponse = CreateMultipartUploadResponse.builder() - .uploadId(randomAlphaOfLength(10)) - .build(); - when(client.createMultipartUpload(any(CreateMultipartUploadRequest.class))).thenReturn(createMultipartUploadResponse); - if (expectException) { - when(client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))).thenThrow( - SdkException.create("Expected upload part request to fail", new RuntimeException()) - ); - } else { - when(client.uploadPart(any(UploadPartRequest.class), any(RequestBody.class))).thenReturn(UploadPartResponse.builder().build()); - } - - // Fail the completion request - when(client.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))).thenReturn( - CompleteMultipartUploadResponse.builder().build() - ); - when(client.abortMultipartUpload(any(AbortMultipartUploadRequest.class))).thenReturn( - AbortMultipartUploadResponse.builder().build() - ); - - List openInputStreams = new ArrayList<>(); - final S3BlobContainer s3BlobContainer = Mockito.spy(new S3BlobContainer(blobPath, blobStore)); - s3BlobContainer.asyncBlobUpload(new WriteContext("write_large_blob", new StreamContextSupplier() { - @Override - public StreamContext supplyStreamContext(long partSize) { - return new StreamContext(new CheckedTriFunction() { - @Override - public InputStreamContainer apply(Integer partNo, Long size, Long position) throws IOException { - InputStream inputStream = new OffsetRangeIndexInputStream(new ZeroIndexInput("desc", blobSize), size, position); - openInputStreams.add(inputStream); - return new InputStreamContainer(inputStream, size, position); - } - }, partSize, calculateLastPartSize(blobSize, partSize), calculateNumberOfParts(blobSize, partSize)); - } - }, blobSize, false, WritePriority.HIGH, uploadSuccess -> { assertTrue(uploadSuccess); }, false, null), completionListener); - - assertTrue(countDownLatch.await(5000, TimeUnit.SECONDS)); - if (expectException) { - assertNotNull(exceptionRef.get()); - } else { - assertNull(exceptionRef.get()); - } - verify(s3BlobContainer, times(1)).executeMultipartUpload(any(S3BlobStore.class), anyString(), any(InputStream.class), anyLong()); - - if (expectException) { - verify(client, times(1)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); - } else { - verify(client, times(0)).abortMultipartUpload(any(AbortMultipartUploadRequest.class)); - } - - openInputStreams.forEach(inputStream -> { - try { - inputStream.close(); - } catch (IOException ex) { - logger.error("Error closing input stream"); - } - }); - } - } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index 1772b5cf4ad48..b249ed695846f 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -69,6 +69,7 @@ import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup; import org.opensearch.repositories.s3.async.AsyncTransferManager; import org.opensearch.repositories.s3.async.PermitBackedRetryableFutureUtils; +import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -117,8 +118,11 @@ public class S3BlobContainerRetriesTests extends AbstractBlobContainerRetriesTes private ExecutorService futureCompletionService; private ExecutorService streamReaderService; private ExecutorService remoteTransferRetry; + private ExecutorService transferQueueConsumerService; private ScheduledExecutorService scheduler; private AsyncTransferEventLoopGroup transferNIOGroup; + private SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ; + private SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ; @Before public void setUp() throws Exception { @@ -130,8 +134,20 @@ public void setUp() throws Exception { streamReaderService = Executors.newSingleThreadExecutor(); transferNIOGroup = new AsyncTransferEventLoopGroup(1); remoteTransferRetry = Executors.newFixedThreadPool(20); + transferQueueConsumerService = Executors.newFixedThreadPool(2); scheduler = new ScheduledThreadPoolExecutor(1); - + otherPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB), + transferQueueConsumerService, + 2 + ); + lowPrioritySizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(Runtime.getRuntime().availableProcessors() * 5L, ByteSizeUnit.GB), + transferQueueConsumerService, + 2 + ); + otherPrioritySizeBasedBlockingQ.start(); + lowPrioritySizeBasedBlockingQ.start(); // needed by S3AsyncService SocketAccess.doPrivileged(() -> System.setProperty("opensearch.path.conf", configPath().toString())); super.setUp(); @@ -144,7 +160,10 @@ public void tearDown() throws Exception { streamReaderService.shutdown(); futureCompletionService.shutdown(); remoteTransferRetry.shutdown(); + transferQueueConsumerService.shutdown(); scheduler.shutdown(); + otherPrioritySizeBasedBlockingQ.close(); + lowPrioritySizeBasedBlockingQ.close(); IOUtils.close(transferNIOGroup); if (previousOpenSearchPathConf != null) { @@ -242,7 +261,9 @@ protected AsyncMultiStreamBlobContainer createBlobContainer( ), asyncExecutorContainer, asyncExecutorContainer, - asyncExecutorContainer + asyncExecutorContainer, + otherPrioritySizeBasedBlockingQ, + lowPrioritySizeBasedBlockingQ ) ) { @Override diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java index 6fec535ae6301..cc7aad76fcd0c 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java @@ -169,7 +169,10 @@ private S3Repository createS3Repo(RepositoryMetadata metadata) { null, null, null, - false + false, + null, + null, + null ) { @Override protected void assertSnapshotOrGenericThread() { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java new file mode 100644 index 0000000000000..7417ea1631422 --- /dev/null +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/SizeBasedBlockingQTests.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +public class SizeBasedBlockingQTests extends OpenSearchTestCase { + private ExecutorService consumerService; + private ExecutorService producerService; + + @Override + @Before + public void setUp() throws Exception { + this.consumerService = Executors.newFixedThreadPool(10); + this.producerService = Executors.newFixedThreadPool(100); + super.setUp(); + } + + @After + public void tearDown() throws Exception { + consumerService.shutdown(); + producerService.shutdown(); + super.tearDown(); + } + + public void testProducerConsumerOfBulkItems() throws InterruptedException { + + SizeBasedBlockingQ sizeBasedBlockingQ = new SizeBasedBlockingQ( + new ByteSizeValue(ByteSizeUnit.BYTES.toBytes(10)), + consumerService, + 10 + ); + sizeBasedBlockingQ.start(); + int numOfItems = randomIntBetween(100, 1000); + CountDownLatch latch = new CountDownLatch(numOfItems); + for (int i = 0; i < numOfItems; i++) { + final int idx = i; + producerService.submit(() -> { + boolean throwException = randomBoolean(); + + SizeBasedBlockingQ.Item item = new TestItemToStr(randomIntBetween(1, 5), () -> { + latch.countDown(); + if (throwException) { + throw new RuntimeException("throwing random exception"); + } + }, idx); + try { + + sizeBasedBlockingQ.produce(item); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + latch.await(); + sizeBasedBlockingQ.close(); + } + + static class TestItemToStr extends SizeBasedBlockingQ.Item { + private final int id; + + public TestItemToStr(long size, Runnable consumable, int id) { + super(size, consumable); + this.id = id; + } + + @Override + public String toString() { + return String.valueOf(id); + } + } +} diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 4fef8c6179c8e..ab76150f8f83d 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -29,6 +29,7 @@ import org.opensearch.common.blobstore.transfer.stream.OffsetRangeIndexInputStream; import org.opensearch.common.blobstore.transfer.stream.OffsetRangeInputStream; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.index.store.exception.ChecksumCombinationException; import java.io.FileNotFoundException; @@ -355,6 +356,7 @@ private void uploadBlob( if (getBlobContainer() instanceof AsyncMultiStreamBlobContainer) { remoteIntegrityEnabled = ((AsyncMultiStreamBlobContainer) getBlobContainer()).remoteIntegrityCheckSupported(); } + lowPriorityUpload = lowPriorityUpload || contentLength > ByteSizeUnit.GB.toBytes(15); RemoteTransferContainer remoteTransferContainer = new RemoteTransferContainer( src, remoteFileName,