diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index 91d1c4975d581..f00cda7bd36ec 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -312,7 +312,7 @@ protected S3Repository createRepository( ClusterService clusterService, RecoverySettings recoverySettings ) { - return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, false) { + return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) { @Override public BlobStore blobStore() { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3Reference.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3Reference.java index 0b5fcb6df280e..45170ea1ad209 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3Reference.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3Reference.java @@ -29,6 +29,7 @@ public class AmazonAsyncS3Reference extends RefCountedReleasable { client.client().close(); client.priorityClient().close(); + client.urgentClient().close(); AwsCredentialsProvider credentials = client.credentials(); if (credentials instanceof Closeable) { try { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3WithCredentials.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3WithCredentials.java index fa2db83729d25..f8a313b55d945 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3WithCredentials.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/AmazonAsyncS3WithCredentials.java @@ -19,16 +19,19 @@ final class AmazonAsyncS3WithCredentials { private final S3AsyncClient client; private final S3AsyncClient priorityClient; + private final S3AsyncClient urgentClient; private final AwsCredentialsProvider credentials; private AmazonAsyncS3WithCredentials( final S3AsyncClient client, final S3AsyncClient priorityClient, + final S3AsyncClient urgentClient, @Nullable final AwsCredentialsProvider credentials ) { this.client = client; this.credentials = credentials; this.priorityClient = priorityClient; + this.urgentClient = urgentClient; } S3AsyncClient client() { @@ -39,6 +42,10 @@ S3AsyncClient priorityClient() { return priorityClient; } + S3AsyncClient urgentClient() { + return urgentClient; + } + AwsCredentialsProvider credentials() { return credentials; } @@ -46,8 +53,9 @@ AwsCredentialsProvider credentials() { static AmazonAsyncS3WithCredentials create( final S3AsyncClient client, final S3AsyncClient priorityClient, + final S3AsyncClient urgentClient, @Nullable final AwsCredentialsProvider credentials ) { - return new AmazonAsyncS3WithCredentials(client, priorityClient, credentials); + return new AmazonAsyncS3WithCredentials(client, priorityClient, urgentClient, credentials); } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java index 08215ebdd45e0..262304029a0d3 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncService.java @@ -103,6 +103,7 @@ public synchronized void refreshAndClearCache(Map clie */ public AmazonAsyncS3Reference client( RepositoryMetadata repositoryMetadata, + AsyncExecutorContainer urgentExecutorBuilder, AsyncExecutorContainer priorityExecutorBuilder, AsyncExecutorContainer normalExecutorBuilder ) { @@ -119,7 +120,7 @@ public AmazonAsyncS3Reference client( return existing; } final AmazonAsyncS3Reference clientReference = new AmazonAsyncS3Reference( - buildClient(clientSettings, priorityExecutorBuilder, normalExecutorBuilder) + buildClient(clientSettings, urgentExecutorBuilder, priorityExecutorBuilder, normalExecutorBuilder) ); clientReference.incRef(); clientsCache = MapBuilder.newMapBuilder(clientsCache).put(clientSettings, clientReference).immutableMap(); @@ -165,6 +166,7 @@ S3ClientSettings settings(RepositoryMetadata repositoryMetadata) { // proxy for testing synchronized AmazonAsyncS3WithCredentials buildClient( final S3ClientSettings clientSettings, + AsyncExecutorContainer urgentExecutorBuilder, AsyncExecutorContainer priorityExecutorBuilder, AsyncExecutorContainer normalExecutorBuilder ) { @@ -195,6 +197,17 @@ synchronized AmazonAsyncS3WithCredentials buildClient( builder.forcePathStyle(true); } + builder.httpClient(buildHttpClient(clientSettings, urgentExecutorBuilder.getAsyncTransferEventLoopGroup())); + builder.asyncConfiguration( + ClientAsyncConfiguration.builder() + .advancedOption( + SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, + urgentExecutorBuilder.getFutureCompletionExecutor() + ) + .build() + ); + final S3AsyncClient urgentClient = SocketAccess.doPrivileged(builder::build); + builder.httpClient(buildHttpClient(clientSettings, priorityExecutorBuilder.getAsyncTransferEventLoopGroup())); builder.asyncConfiguration( ClientAsyncConfiguration.builder() @@ -217,7 +230,7 @@ synchronized AmazonAsyncS3WithCredentials buildClient( ); final S3AsyncClient client = SocketAccess.doPrivileged(builder::build); - return AmazonAsyncS3WithCredentials.create(client, priorityClient, credentials); + return AmazonAsyncS3WithCredentials.create(client, priorityClient, urgentClient, credentials); } static ClientOverrideConfiguration buildOverrideConfiguration(final S3ClientSettings clientSettings) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 24aee99242957..c1180aab0e0c7 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -195,9 +195,14 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener comp StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize)); try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) { - S3AsyncClient s3AsyncClient = writeContext.getWritePriority() == WritePriority.HIGH - ? amazonS3Reference.get().priorityClient() - : amazonS3Reference.get().client(); + S3AsyncClient s3AsyncClient; + if (writeContext.getWritePriority() == WritePriority.URGENT) { + s3AsyncClient = amazonS3Reference.get().urgentClient(); + } else if (writeContext.getWritePriority() == WritePriority.HIGH) { + s3AsyncClient = amazonS3Reference.get().priorityClient(); + } else { + s3AsyncClient = amazonS3Reference.get().client(); + } CompletableFuture completableFuture = blobStore.getAsyncTransferManager() .uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher()); completableFuture.whenComplete((response, throwable) -> { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index f568d871dd31a..e8e043357e126 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -84,6 +84,7 @@ class S3BlobStore implements BlobStore { private final StatsMetricPublisher statsMetricPublisher = new StatsMetricPublisher(); private final AsyncTransferManager asyncTransferManager; + private final AsyncExecutorContainer urgentExecutorBuilder; private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; private final boolean multipartUploadEnabled; @@ -100,6 +101,7 @@ class S3BlobStore implements BlobStore { int bulkDeletesSize, RepositoryMetadata repositoryMetadata, AsyncTransferManager asyncTransferManager, + AsyncExecutorContainer urgentExecutorBuilder, AsyncExecutorContainer priorityExecutorBuilder, AsyncExecutorContainer normalExecutorBuilder ) { @@ -116,6 +118,7 @@ class S3BlobStore implements BlobStore { this.asyncTransferManager = asyncTransferManager; this.normalExecutorBuilder = normalExecutorBuilder; this.priorityExecutorBuilder = priorityExecutorBuilder; + this.urgentExecutorBuilder = urgentExecutorBuilder; } @Override @@ -139,7 +142,7 @@ public AmazonS3Reference clientReference() { } public AmazonAsyncS3Reference asyncClientReference() { - return s3AsyncService.client(repositoryMetadata, priorityExecutorBuilder, normalExecutorBuilder); + return s3AsyncService.client(repositoryMetadata, urgentExecutorBuilder, priorityExecutorBuilder, normalExecutorBuilder); } int getMaxRetries() { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java index fd8952cb5abd1..95cf5eca0f2f6 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java @@ -265,6 +265,7 @@ class S3Repository extends MeteredBlobStoreRepository { private final AsyncTransferManager asyncUploadUtils; private final S3AsyncService s3AsyncService; private final boolean multipartUploadEnabled; + private final AsyncExecutorContainer urgentExecutorBuilder; private final AsyncExecutorContainer priorityExecutorBuilder; private final AsyncExecutorContainer normalExecutorBuilder; private final Path pluginConfigPath; @@ -279,6 +280,7 @@ class S3Repository extends MeteredBlobStoreRepository { final ClusterService clusterService, final RecoverySettings recoverySettings, final AsyncTransferManager asyncUploadUtils, + final AsyncExecutorContainer urgentExecutorBuilder, final AsyncExecutorContainer priorityExecutorBuilder, final AsyncExecutorContainer normalExecutorBuilder, final S3AsyncService s3AsyncService, @@ -291,6 +293,7 @@ class S3Repository extends MeteredBlobStoreRepository { clusterService, recoverySettings, asyncUploadUtils, + urgentExecutorBuilder, priorityExecutorBuilder, normalExecutorBuilder, s3AsyncService, @@ -309,6 +312,7 @@ class S3Repository extends MeteredBlobStoreRepository { final ClusterService clusterService, final RecoverySettings recoverySettings, final AsyncTransferManager asyncUploadUtils, + final AsyncExecutorContainer urgentExecutorBuilder, final AsyncExecutorContainer priorityExecutorBuilder, final AsyncExecutorContainer normalExecutorBuilder, final S3AsyncService s3AsyncService, @@ -321,6 +325,7 @@ class S3Repository extends MeteredBlobStoreRepository { this.multipartUploadEnabled = multipartUploadEnabled; this.pluginConfigPath = pluginConfigPath; this.asyncUploadUtils = asyncUploadUtils; + this.urgentExecutorBuilder = urgentExecutorBuilder; this.priorityExecutorBuilder = priorityExecutorBuilder; this.normalExecutorBuilder = normalExecutorBuilder; @@ -438,6 +443,7 @@ protected S3BlobStore createBlobStore() { bulkDeletesSize, metadata, asyncUploadUtils, + urgentExecutorBuilder, priorityExecutorBuilder, normalExecutorBuilder ); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java index c6450e49d08e2..9ed232464d080 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java @@ -75,6 +75,9 @@ * A plugin to add a repository type that writes to and from the AWS S3. */ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin { + + private static final String URGENT_FUTURE_COMPLETION = "urgent_future_completion"; + private static final String URGENT_STREAM_READER = "urgent_stream_reader"; private static final String PRIORITY_FUTURE_COMPLETION = "priority_future_completion"; private static final String PRIORITY_STREAM_READER = "priority_stream_reader"; private static final String FUTURE_COMPLETION = "future_completion"; @@ -85,6 +88,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo private final Path configPath; + private AsyncExecutorContainer urgentExecutorBuilder; private AsyncExecutorContainer priorityExecutorBuilder; private AsyncExecutorContainer normalExecutorBuilder; @@ -96,6 +100,10 @@ public S3RepositoryPlugin(final Settings settings, final Path configPath) { public List> getExecutorBuilders(Settings settings) { List> executorBuilders = new ArrayList<>(); int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors(settings)); + executorBuilders.add( + new FixedExecutorBuilder(settings, URGENT_FUTURE_COMPLETION, urgentPoolCount(settings), 10_000, URGENT_FUTURE_COMPLETION) + ); + executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5))); executorBuilders.add( new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION) ); @@ -128,6 +136,10 @@ private static int allocatedProcessors(Settings settings) { return OpenSearchExecutors.allocatedProcessors(settings); } + private static int urgentPoolCount(Settings settings) { + return boundedBy((allocatedProcessors(settings) + 7) / 8, 1, 2); + } + private static int priorityPoolCount(Settings settings) { return boundedBy((allocatedProcessors(settings) + 1) / 2, 2, 4); } @@ -150,8 +162,14 @@ public Collection createComponents( final IndexNameExpressionResolver expressionResolver, final Supplier repositoriesServiceSupplier ) { + int urgentEventLoopThreads = urgentPoolCount(clusterService.getSettings()); int priorityEventLoopThreads = priorityPoolCount(clusterService.getSettings()); int normalEventLoopThreads = normalPoolCount(clusterService.getSettings()); + this.urgentExecutorBuilder = new AsyncExecutorContainer( + threadPool.executor(URGENT_FUTURE_COMPLETION), + threadPool.executor(URGENT_STREAM_READER), + new AsyncTransferEventLoopGroup(urgentEventLoopThreads) + ); this.priorityExecutorBuilder = new AsyncExecutorContainer( threadPool.executor(PRIORITY_FUTURE_COMPLETION), threadPool.executor(PRIORITY_STREAM_READER), @@ -176,7 +194,8 @@ protected S3Repository createRepository( AsyncTransferManager asyncUploadUtils = new AsyncTransferManager( S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(), normalExecutorBuilder.getStreamReader(), - priorityExecutorBuilder.getStreamReader() + priorityExecutorBuilder.getStreamReader(), + urgentExecutorBuilder.getStreamReader() ); return new S3Repository( metadata, @@ -185,6 +204,7 @@ protected S3Repository createRepository( clusterService, recoverySettings, asyncUploadUtils, + urgentExecutorBuilder, priorityExecutorBuilder, normalExecutorBuilder, s3AsyncService, diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java index 6007d9f9c8a1c..933ee6dc29513 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncPartsHandler.java @@ -48,6 +48,7 @@ public class AsyncPartsHandler { * @param s3AsyncClient S3 client to use for upload * @param executorService Thread pool for regular upload * @param priorityExecutorService Thread pool for priority uploads + * @param urgentExecutorService Thread pool for urgent uploads * @param uploadRequest request for upload * @param streamContext Stream context used in supplying individual file parts * @param uploadId Upload Id against which multi-part is being performed @@ -60,6 +61,7 @@ public static List> uploadParts( S3AsyncClient s3AsyncClient, ExecutorService executorService, ExecutorService priorityExecutorService, + ExecutorService urgentExecutorService, UploadRequest uploadRequest, StreamContext streamContext, String uploadId, @@ -83,6 +85,7 @@ public static List> uploadParts( s3AsyncClient, executorService, priorityExecutorService, + urgentExecutorService, completedParts, inputStreamContainers, futures, @@ -129,6 +132,7 @@ private static void uploadPart( S3AsyncClient s3AsyncClient, ExecutorService executorService, ExecutorService priorityExecutorService, + ExecutorService urgentExecutorService, AtomicReferenceArray completedParts, AtomicReferenceArray inputStreamContainers, List> futures, @@ -138,9 +142,14 @@ private static void uploadPart( ) { Integer partNumber = uploadPartRequest.partNumber(); - ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH - ? priorityExecutorService - : executorService; + ExecutorService streamReadExecutor; + if (uploadRequest.getWritePriority() == WritePriority.URGENT) { + streamReadExecutor = urgentExecutorService; + } else if (uploadRequest.getWritePriority() == WritePriority.HIGH) { + streamReadExecutor = priorityExecutorService; + } else { + streamReadExecutor = executorService; + } // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered // data can be retried instead of retrying whole file by the application. InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java index a52745e33073e..4f1ab9764702e 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java @@ -61,6 +61,7 @@ public final class AsyncTransferManager { private static final Logger log = LogManager.getLogger(AsyncTransferManager.class); private final ExecutorService executorService; private final ExecutorService priorityExecutorService; + private final ExecutorService urgentExecutorService; private final long minimumPartSize; /** @@ -75,10 +76,16 @@ public final class AsyncTransferManager { * @param executorService The stream reader {@link ExecutorService} for normal priority uploads * @param priorityExecutorService The stream read {@link ExecutorService} for high priority uploads */ - public AsyncTransferManager(long minimumPartSize, ExecutorService executorService, ExecutorService priorityExecutorService) { + public AsyncTransferManager( + long minimumPartSize, + ExecutorService executorService, + ExecutorService priorityExecutorService, + ExecutorService urgentExecutorService + ) { this.executorService = executorService; this.priorityExecutorService = priorityExecutorService; this.minimumPartSize = minimumPartSize; + this.urgentExecutorService = urgentExecutorService; } /** @@ -162,6 +169,7 @@ private void doUploadInParts( s3AsyncClient, executorService, priorityExecutorService, + urgentExecutorService, uploadRequest, streamContext, uploadId, @@ -308,9 +316,14 @@ private void uploadInOneChunk( putObjectRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32); putObjectRequestBuilder.checksumCRC32(base64StringFromLong(uploadRequest.getExpectedChecksum())); } - ExecutorService streamReadExecutor = uploadRequest.getWritePriority() == WritePriority.HIGH - ? priorityExecutorService - : executorService; + ExecutorService streamReadExecutor; + if (uploadRequest.getWritePriority() == WritePriority.URGENT) { + streamReadExecutor = urgentExecutorService; + } else if (uploadRequest.getWritePriority() == WritePriority.HIGH) { + streamReadExecutor = priorityExecutorService; + } else { + streamReadExecutor = executorService; + } // Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered // data can be retried instead of retrying whole file by the application. InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1)); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java index a4bfe11383b4f..8e1926d40302f 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/RepositoryCredentialsTests.java @@ -302,7 +302,7 @@ protected S3Repository createRepository( ClusterService clusterService, RecoverySettings recoverySettings ) { - return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, false) { + return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually on test/main threads diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3AsyncServiceTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3AsyncServiceTests.java index e9fe557ab751a..de9ad46bb222d 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3AsyncServiceTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3AsyncServiceTests.java @@ -44,12 +44,12 @@ public void testCachedClientsAreReleased() { final S3ClientSettings otherClientSettings = s3AsyncService.settings(metadata2); assertSame(clientSettings, otherClientSettings); final AmazonAsyncS3Reference reference = SocketAccess.doPrivileged( - () -> s3AsyncService.client(metadata1, asyncExecutorContainer, asyncExecutorContainer) + () -> s3AsyncService.client(metadata1, asyncExecutorContainer, asyncExecutorContainer, asyncExecutorContainer) ); reference.close(); s3AsyncService.close(); final AmazonAsyncS3Reference referenceReloaded = SocketAccess.doPrivileged( - () -> s3AsyncService.client(metadata1, asyncExecutorContainer, asyncExecutorContainer) + () -> s3AsyncService.client(metadata1, asyncExecutorContainer, asyncExecutorContainer, asyncExecutorContainer) ); assertNotSame(referenceReloaded, reference); referenceReloaded.close(); @@ -79,12 +79,12 @@ public void testCachedClientsWithCredentialsAreReleased() { final S3ClientSettings otherClientSettings = s3AsyncService.settings(metadata2); assertSame(clientSettings, otherClientSettings); final AmazonAsyncS3Reference reference = SocketAccess.doPrivileged( - () -> s3AsyncService.client(metadata1, asyncExecutorContainer, asyncExecutorContainer) + () -> s3AsyncService.client(metadata1, asyncExecutorContainer, asyncExecutorContainer, asyncExecutorContainer) ); reference.close(); s3AsyncService.close(); final AmazonAsyncS3Reference referenceReloaded = SocketAccess.doPrivileged( - () -> s3AsyncService.client(metadata1, asyncExecutorContainer, asyncExecutorContainer) + () -> s3AsyncService.client(metadata1, asyncExecutorContainer, asyncExecutorContainer, asyncExecutorContainer) ); assertNotSame(referenceReloaded, reference); referenceReloaded.close(); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java index 6eb8faa746d34..7c67519f2f3b0 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerMockClientTests.java @@ -266,10 +266,11 @@ public void verifySingleChunkUploadCallCount(boolean finalizeUploadFailure) { @Override public AmazonAsyncS3Reference client( RepositoryMetadata repositoryMetadata, + AsyncExecutorContainer urgentExecutorBuilder, AsyncExecutorContainer priorityExecutorBuilder, AsyncExecutorContainer normalExecutorBuilder ) { - return new AmazonAsyncS3Reference(AmazonAsyncS3WithCredentials.create(asyncClient, asyncClient, null)); + return new AmazonAsyncS3Reference(AmazonAsyncS3WithCredentials.create(asyncClient, asyncClient, asyncClient, null)); } } @@ -393,9 +394,11 @@ private S3BlobStore createBlobStore() { new AsyncTransferManager( S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), asyncExecutorContainer.getStreamReader(), + asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader() ), asyncExecutorContainer, + asyncExecutorContainer, asyncExecutorContainer ); } diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java index a2214f5218991..ceab06bd051e9 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobContainerRetriesTests.java @@ -221,9 +221,11 @@ protected AsyncMultiStreamBlobContainer createBlobContainer( new AsyncTransferManager( S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(), asyncExecutorContainer.getStreamReader(), + asyncExecutorContainer.getStreamReader(), asyncExecutorContainer.getStreamReader() ), asyncExecutorContainer, + asyncExecutorContainer, asyncExecutorContainer ) ) { diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index 2701cae6a733b..58ad290a31e85 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -935,7 +935,7 @@ public void testReadBlobAsyncMultiPart() throws Exception { final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) + AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null) ); final S3BlobStore blobStore = mock(S3BlobStore.class); @@ -993,7 +993,7 @@ public void testReadBlobAsyncSinglePart() throws Exception { final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) + AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null) ); final S3BlobStore blobStore = mock(S3BlobStore.class); final BlobPath blobPath = new BlobPath(); @@ -1048,7 +1048,7 @@ public void testReadBlobAsyncFailure() throws Exception { final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) + AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null) ); final S3BlobStore blobStore = mock(S3BlobStore.class); @@ -1091,7 +1091,7 @@ public void testReadBlobAsyncOnCompleteFailureMissingData() throws Exception { final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); final AmazonAsyncS3Reference amazonAsyncS3Reference = new AmazonAsyncS3Reference( - AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, null) + AmazonAsyncS3WithCredentials.create(s3AsyncClient, s3AsyncClient, s3AsyncClient, null) ); final S3BlobStore blobStore = mock(S3BlobStore.class); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java index e65ca69a5047b..6fec535ae6301 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3RepositoryTests.java @@ -168,6 +168,7 @@ private S3Repository createS3Repo(RepositoryMetadata metadata) { null, null, null, + null, false ) { @Override diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java index 97a746cdeed93..2437547a80a6f 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/AsyncTransferManagerTests.java @@ -64,6 +64,7 @@ public void setUp() throws Exception { asyncTransferManager = new AsyncTransferManager( ByteSizeUnit.MB.toBytes(5), Executors.newSingleThreadExecutor(), + Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor() ); super.setUp(); diff --git a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java index b8c0b52f93a3c..3f341c878c3c7 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java +++ b/server/src/main/java/org/opensearch/common/blobstore/stream/write/WritePriority.java @@ -15,5 +15,6 @@ */ public enum WritePriority { NORMAL, - HIGH + HIGH, + URGENT } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index ae4a3fab9852d..025ad075d83b6 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -378,7 +378,7 @@ private String writeGlobalMetadata(ClusterState clusterState) throws IOException result.set(globalMetadataContainer.path().buildAsString() + globalMetadataFilename); }, ex -> { throw new GlobalMetadataTransferException(ex.getMessage(), ex); }), latch); - GLOBAL_METADATA_FORMAT.writeAsync( + GLOBAL_METADATA_FORMAT.writeAsyncWithUrgentPriority( clusterState.metadata(), globalMetadataContainer, globalMetadataFilename, @@ -510,7 +510,7 @@ private void writeIndexMetadataAsync( ex -> latchedActionListener.onFailure(new IndexMetadataTransferException(indexMetadata.getIndex().toString(), ex)) ); - INDEX_METADATA_FORMAT.writeAsync( + INDEX_METADATA_FORMAT.writeAsyncWithUrgentPriority( indexMetadata, indexMetadataContainer, indexMetadataFilename, diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java index e280141c12bc1..3e6052a5ef820 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -197,21 +197,56 @@ public void write( } /** - * Writes blob with resolving the blob name using {@link #blobName} method. - * Leverages the multipart upload if supported by the blobContainer. + * Internally calls {@link #writeAsyncWithPriority} with {@link WritePriority#NORMAL} + */ + public void writeAsync( + final T obj, + final BlobContainer blobContainer, + final String name, + final Compressor compressor, + ActionListener listener, + final ToXContent.Params params + ) throws IOException { + // use NORMAL priority by default + this.writeAsyncWithPriority(obj, blobContainer, name, compressor, WritePriority.NORMAL, listener, params); + } + + /** + * Internally calls {@link #writeAsyncWithPriority} with {@link WritePriority#URGENT} + *

+ * NOTE: We use this method to upload urgent priority objects like cluster state to remote stores. + * Use {@link #writeAsync(ToXContent, BlobContainer, String, Compressor, ActionListener, ToXContent.Params)} for + * other use cases. + */ + public void writeAsyncWithUrgentPriority( + final T obj, + final BlobContainer blobContainer, + final String name, + final Compressor compressor, + ActionListener listener, + final ToXContent.Params params + ) throws IOException { + this.writeAsyncWithPriority(obj, blobContainer, name, compressor, WritePriority.URGENT, listener, params); + } + + /** + * Method to writes blob with resolving the blob name using {@link #blobName} method with specified + * {@link WritePriority}. Leverages the multipart upload if supported by the blobContainer. * * @param obj object to be serialized * @param blobContainer blob container * @param name blob name * @param compressor whether to use compression + * @param priority write priority to be used * @param listener listener to listen to write result * @param params ToXContent params */ - public void writeAsync( + private void writeAsyncWithPriority( final T obj, final BlobContainer blobContainer, final String name, final Compressor compressor, + final WritePriority priority, ActionListener listener, final ToXContent.Params params ) throws IOException { @@ -222,7 +257,7 @@ public void writeAsync( } final String blobName = blobName(name); final BytesReference bytes = serialize(obj, blobName, compressor, params); - final String resourceDescription = "ChecksumBlobStoreFormat.writeAsync(blob=\"" + blobName + "\")"; + final String resourceDescription = "ChecksumBlobStoreFormat.writeAsyncWithPriority(blob=\"" + blobName + "\")"; try (IndexInput input = new ByteArrayIndexInput(resourceDescription, BytesReference.toBytes(bytes))) { long expectedChecksum; try { @@ -242,7 +277,7 @@ public void writeAsync( blobName, bytes.length(), true, - WritePriority.HIGH, + priority, (size, position) -> new OffsetRangeIndexInputStream(input, size, position), expectedChecksum, ((AsyncMultiStreamBlobContainer) blobContainer).remoteIntegrityCheckSupported() diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java index 4be5fc03c2a6d..173e15b8eca37 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteClusterStateServiceTests.java @@ -273,7 +273,7 @@ public void testWriteFullMetadataInParallelSuccess() throws IOException { new BytesArray(writtenBytes) ); - assertEquals(capturedWriteContext.getWritePriority(), WritePriority.HIGH); + assertEquals(capturedWriteContext.getWritePriority(), WritePriority.URGENT); assertEquals(writtenIndexMetadata.getNumberOfShards(), 1); assertEquals(writtenIndexMetadata.getNumberOfReplicas(), 0); assertEquals(writtenIndexMetadata.getIndex().getName(), "test-index"); diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index c114b56bd0b39..c5f36fcc01983 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -43,6 +43,7 @@ import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.blobstore.stream.read.ReadContext; import org.opensearch.common.blobstore.stream.write.WriteContext; +import org.opensearch.common.blobstore.stream.write.WritePriority; import org.opensearch.common.compress.DeflateCompressor; import org.opensearch.common.io.Streams; import org.opensearch.common.io.stream.BytesStreamOutput; @@ -65,8 +66,13 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; +import org.mockito.ArgumentCaptor; + import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThan; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; public class BlobStoreFormatTests extends OpenSearchTestCase { @@ -128,44 +134,36 @@ public void testBlobStoreAsyncOperations() throws IOException, InterruptedExcept BlobPath.cleanPath(), null ); + MockFsVerifyingBlobContainer spyContainer = spy(mockBlobContainer); ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); - + ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); + ArgumentCaptor writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class); CountDownLatch latch = new CountDownLatch(2); - ActionListener actionListener = new ActionListener<>() { - @Override - public void onResponse(Void unused) { - logger.info("---> Async write succeeded"); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - logger.info("---> Failure in async write"); - throw new RuntimeException("async write should not fail"); - } - }; - // Write blobs in different formats checksumSMILE.writeAsync( new BlobObj("checksum smile"), - mockBlobContainer, + spyContainer, "check-smile", CompressorRegistry.none(), - actionListener, + getVoidActionListener(latch), ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS ); checksumSMILE.writeAsync( new BlobObj("checksum smile compressed"), - mockBlobContainer, + spyContainer, "check-smile-comp", CompressorRegistry.getCompressor(DeflateCompressor.NAME), - actionListener, + getVoidActionListener(latch), ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS ); latch.await(); + verify(spyContainer, times(2)).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); + assertEquals(2, writeContextArgumentCaptor.getAllValues().size()); + writeContextArgumentCaptor.getAllValues() + .forEach(writeContext -> assertEquals(WritePriority.NORMAL, writeContext.getWritePriority())); // Assert that all checksum blobs can be read assertEquals(checksumSMILE.read(mockBlobContainer.getDelegate(), "check-smile", xContentRegistry()).getText(), "checksum smile"); assertEquals( @@ -174,6 +172,39 @@ public void onFailure(Exception e) { ); } + public void testBlobStorePriorityAsyncOperation() throws IOException, InterruptedException { + BlobStore blobStore = createTestBlobStore(); + MockFsVerifyingBlobContainer mockBlobContainer = new MockFsVerifyingBlobContainer( + (FsBlobStore) blobStore, + BlobPath.cleanPath(), + null + ); + MockFsVerifyingBlobContainer spyContainer = spy(mockBlobContainer); + ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); + + ArgumentCaptor> actionListenerArgumentCaptor = ArgumentCaptor.forClass(ActionListener.class); + ArgumentCaptor writeContextArgumentCaptor = ArgumentCaptor.forClass(WriteContext.class); + CountDownLatch latch = new CountDownLatch(1); + + // Write blobs in different formats + checksumSMILE.writeAsyncWithUrgentPriority( + new BlobObj("cluster state diff"), + spyContainer, + "cluster-state-diff", + CompressorRegistry.none(), + getVoidActionListener(latch), + ChecksumBlobStoreFormat.SNAPSHOT_ONLY_FORMAT_PARAMS + ); + latch.await(); + + verify(spyContainer).asyncBlobUpload(writeContextArgumentCaptor.capture(), actionListenerArgumentCaptor.capture()); + assertEquals(WritePriority.URGENT, writeContextArgumentCaptor.getValue().getWritePriority()); + assertEquals( + checksumSMILE.read(mockBlobContainer.getDelegate(), "cluster-state-diff", xContentRegistry()).getText(), + "cluster state diff" + ); + } + public void testBlobStoreOperations() throws IOException { BlobStore blobStore = createTestBlobStore(); BlobContainer blobContainer = blobStore.blobContainer(BlobPath.cleanPath()); @@ -228,6 +259,24 @@ public void testBlobCorruption() throws IOException { } } + private ActionListener getVoidActionListener(CountDownLatch latch) { + ActionListener actionListener = new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("---> Async write succeeded"); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + logger.info("---> Failure in async write"); + throw new RuntimeException("async write should not fail"); + } + }; + + return actionListener; + } + protected BlobStore createTestBlobStore() throws IOException { return new FsBlobStore(randomIntBetween(1, 8) * 1024, createTempDir(), false); }