From 34b8888180083048f12e93f7550ee168b8287020 Mon Sep 17 00:00:00 2001 From: Ashish Singh Date: Sun, 22 Sep 2024 22:41:49 +0530 Subject: [PATCH] Add support for async deletion in S3BlobContainer (#15621) (#16013) * Add support for async deletion in S3BlobContainer * Move helper methods to helper class * Minor refactor * Add UTs * Add more tests * Integrate async deletion in the snapshot interactions --------- Signed-off-by: Ashish Singh --- CHANGELOG.md | 1 + .../s3/S3BlobStoreRepositoryTests.java | 1 + .../s3/S3RepositoryThirdPartyTests.java | 8 + .../repositories/s3/S3BlobContainer.java | 124 +++++ .../repositories/s3/S3BlobStore.java | 2 +- .../repositories/s3/StatsMetricPublisher.java | 4 + .../s3/async/S3AsyncDeleteHelper.java | 95 ++++ .../s3/S3BlobStoreContainerTests.java | 507 +++++++++++++++++- .../s3/async/S3AsyncDeleteHelperTests.java | 236 ++++++++ .../mocks/MockFsAsyncBlobContainer.java | 12 + .../AsyncMultiStreamBlobContainer.java | 5 + ...syncMultiStreamEncryptedBlobContainer.java | 10 + .../common/settings/ClusterSettings.java | 3 + .../blobstore/BlobStoreRepository.java | 60 ++- .../BlobStoreTransferServiceTests.java | 12 + .../snapshots/BlobStoreFormatTests.java | 12 + 16 files changed, 1086 insertions(+), 6 deletions(-) create mode 100644 plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java create mode 100644 plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelperTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index b6ae0c3b83f30..0ea17c5b84938 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added - [Offline Nodes] Adds offline-tasks library containing various interfaces to be used for Offline Background Tasks. ([#13574](https://github.com/opensearch-project/OpenSearch/pull/13574)) - Add path prefix support to hashed prefix snapshots ([#15664](https://github.com/opensearch-project/OpenSearch/pull/15664)) +- Add support for async deletion in S3BlobContainer ([#15621](https://github.com/opensearch-project/OpenSearch/pull/15621)) - [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651)) - Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916)) - Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967)) diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java index 38e0786bb1f17..909c64ce25372 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -166,6 +166,7 @@ protected Settings nodeSettings(int nodeOrdinal) { // Disable request throttling because some random values in tests might generate too many failures for the S3 client .put(S3ClientSettings.USE_THROTTLE_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), false) .put(S3ClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace("test").getKey(), ProxySettings.ProxyType.DIRECT) + .put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false) .put(super.nodeSettings(nodeOrdinal)) .setSecureSettings(secureSettings); diff --git a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java index 7db9a0d3ba790..f0e40db965646 100644 --- a/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/plugins/repository-s3/src/internalClusterTest/java/org/opensearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -55,6 +55,14 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTestCase { + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false) + .build(); + } + @Override @Before @SuppressForbidden(reason = "Need to set system property here for AWS SDK v2") diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index b489a3cc85037..1a402e8431e25 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -62,6 +62,7 @@ import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; +import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher; import software.amazon.awssdk.utils.CollectionUtils; import org.apache.logging.log4j.LogManager; @@ -90,6 +91,7 @@ import org.opensearch.core.common.Strings; import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.repositories.s3.async.S3AsyncDeleteHelper; import org.opensearch.repositories.s3.async.SizeBasedBlockingQ; import org.opensearch.repositories.s3.async.UploadRequest; import org.opensearch.repositories.s3.utils.HttpRangeUtils; @@ -109,6 +111,9 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE; import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART; import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART; @@ -875,4 +880,123 @@ CompletableFuture getBlobMetadata(S3AsyncClient s3A return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest)); } + + @Override + public void deleteAsync(ActionListener completionListener) { + try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) { + S3AsyncClient s3AsyncClient = asyncClientReference.get().client(); + + ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build(); + ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest); + + AtomicLong deletedBlobs = new AtomicLong(); + AtomicLong deletedBytes = new AtomicLong(); + + CompletableFuture listingFuture = new CompletableFuture<>(); + + listPublisher.subscribe(new Subscriber<>() { + private Subscription subscription; + private final List objectsToDelete = new ArrayList<>(); + private CompletableFuture deletionChain = CompletableFuture.completedFuture(null); + + @Override + public void onSubscribe(Subscription s) { + this.subscription = s; + subscription.request(1); + } + + @Override + public void onNext(ListObjectsV2Response response) { + response.contents().forEach(s3Object -> { + deletedBlobs.incrementAndGet(); + deletedBytes.addAndGet(s3Object.size()); + objectsToDelete.add(s3Object.key()); + }); + + int bulkDeleteSize = blobStore.getBulkDeletesSize(); + if (objectsToDelete.size() >= bulkDeleteSize) { + int fullBatchesCount = objectsToDelete.size() / bulkDeleteSize; + int itemsToDelete = fullBatchesCount * bulkDeleteSize; + + List batchToDelete = new ArrayList<>(objectsToDelete.subList(0, itemsToDelete)); + objectsToDelete.subList(0, itemsToDelete).clear(); + + deletionChain = S3AsyncDeleteHelper.executeDeleteChain( + s3AsyncClient, + blobStore, + batchToDelete, + deletionChain, + () -> subscription.request(1) + ); + } else { + subscription.request(1); + } + } + + @Override + public void onError(Throwable t) { + listingFuture.completeExceptionally(new IOException("Failed to list objects for deletion", t)); + } + + @Override + public void onComplete() { + if (!objectsToDelete.isEmpty()) { + deletionChain = S3AsyncDeleteHelper.executeDeleteChain( + s3AsyncClient, + blobStore, + objectsToDelete, + deletionChain, + null + ); + } + deletionChain.whenComplete((v, throwable) -> { + if (throwable != null) { + listingFuture.completeExceptionally(throwable); + } else { + listingFuture.complete(null); + } + }); + } + }); + + listingFuture.whenComplete((v, throwable) -> { + if (throwable != null) { + completionListener.onFailure( + throwable instanceof Exception + ? (Exception) throwable + : new IOException("Unexpected error during async deletion", throwable) + ); + } else { + completionListener.onResponse(new DeleteResult(deletedBlobs.get(), deletedBytes.get())); + } + }); + } catch (Exception e) { + completionListener.onFailure(new IOException("Failed to initiate async deletion", e)); + } + } + + @Override + public void deleteBlobsAsyncIgnoringIfNotExists(List blobNames, ActionListener completionListener) { + if (blobNames.isEmpty()) { + completionListener.onResponse(null); + return; + } + + try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) { + S3AsyncClient s3AsyncClient = asyncClientReference.get().client(); + + List keysToDelete = blobNames.stream().map(this::buildKey).collect(Collectors.toList()); + + S3AsyncDeleteHelper.executeDeleteChain(s3AsyncClient, blobStore, keysToDelete, CompletableFuture.completedFuture(null), null) + .whenComplete((v, throwable) -> { + if (throwable != null) { + completionListener.onFailure(new IOException("Failed to delete blobs " + blobNames, throwable)); + } else { + completionListener.onResponse(null); + } + }); + } catch (Exception e) { + completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e)); + } + } } diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java index f688be9216b8f..90bfa11e18481 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java @@ -63,7 +63,7 @@ import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING; import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED; -class S3BlobStore implements BlobStore { +public class S3BlobStore implements BlobStore { private static final Logger logger = LogManager.getLogger(S3BlobStore.class); diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java index 8d2772d42ebca..9f73c67df3b18 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java @@ -95,6 +95,10 @@ public void publish(MetricCollection metricCollection) { public void close() {} }; + public MetricPublisher getDeleteObjectsMetricPublisher() { + return deleteObjectsMetricPublisher; + } + public MetricPublisher getObjectMetricPublisher = new MetricPublisher() { @Override public void publish(MetricCollection metricCollection) { diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java new file mode 100644 index 0000000000000..eed95c0e68ef1 --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelper.java @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.repositories.s3.S3BlobStore; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class S3AsyncDeleteHelper { + private static final Logger logger = LogManager.getLogger(S3AsyncDeleteHelper.class); + + public static CompletableFuture executeDeleteChain( + S3AsyncClient s3AsyncClient, + S3BlobStore blobStore, + List objectsToDelete, + CompletableFuture currentChain, + Runnable afterDeleteAction + ) { + List> batches = createDeleteBatches(objectsToDelete, blobStore.getBulkDeletesSize()); + CompletableFuture newChain = currentChain.thenCompose(v -> executeDeleteBatches(s3AsyncClient, blobStore, batches)); + if (afterDeleteAction != null) { + newChain = newChain.thenRun(afterDeleteAction); + } + return newChain; + } + + static List> createDeleteBatches(List keys, int bulkDeleteSize) { + List> batches = new ArrayList<>(); + for (int i = 0; i < keys.size(); i += bulkDeleteSize) { + batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize))); + } + return batches; + } + + static CompletableFuture executeDeleteBatches(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List> batches) { + CompletableFuture allDeletesFuture = CompletableFuture.completedFuture(null); + + for (List batch : batches) { + allDeletesFuture = allDeletesFuture.thenCompose(v -> executeSingleDeleteBatch(s3AsyncClient, blobStore, batch)); + } + + return allDeletesFuture; + } + + static CompletableFuture executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, S3BlobStore blobStore, List batch) { + DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore); + return s3AsyncClient.deleteObjects(deleteRequest).thenApply(S3AsyncDeleteHelper::processDeleteResponse); + } + + static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) { + if (!deleteObjectsResponse.errors().isEmpty()) { + logger.warn( + () -> new ParameterizedMessage( + "Failed to delete some blobs {}", + deleteObjectsResponse.errors() + .stream() + .map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]") + .collect(Collectors.toList()) + ) + ); + } + return null; + } + + static DeleteObjectsRequest bulkDelete(String bucket, List blobs, S3BlobStore blobStore) { + return DeleteObjectsRequest.builder() + .bucket(bucket) + .delete( + Delete.builder() + .objects(blobs.stream().map(blob -> ObjectIdentifier.builder().key(blob).build()).collect(Collectors.toList())) + .quiet(true) + .build() + ) + .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().getDeleteObjectsMetricPublisher())) + .build(); + } +} diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java index 654d8a72690c4..2cb11541d924f 100644 --- a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/S3BlobStoreContainerTests.java @@ -69,6 +69,7 @@ import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; +import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher; import org.opensearch.action.LatchedActionListener; import org.opensearch.common.blobstore.BlobContainer; @@ -99,22 +100,28 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.mockito.ArgumentCaptor; import org.mockito.ArgumentMatchers; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; public class S3BlobStoreContainerTests extends OpenSearchTestCase { @@ -1275,6 +1282,504 @@ public void testTransformResponseToInputStreamContainer() throws Exception { assertEquals(inputStream.available(), inputStreamContainer.getInputStream().available()); } + public void testDeleteAsync() throws Exception { + for (int i = 0; i < 100; i++) { + testDeleteAsync(i + 1); + } + } + + private void testDeleteAsync(int bulkDeleteSize) throws InterruptedException { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.getBulkDeletesSize()).thenReturn(bulkDeleteSize); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create( + s3AsyncClient, + s3AsyncClient, + s3AsyncClient, + null + ); + when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials); + + final List s3Objects = new ArrayList<>(); + int numObjects = randomIntBetween(20, 100); + long totalSize = 0; + for (int i = 0; i < numObjects; i++) { + long size = randomIntBetween(1, 100); + s3Objects.add(S3Object.builder().key(randomAlphaOfLength(10)).size(size).build()); + totalSize += size; + } + + final List responseList = new ArrayList<>(); + int size = 0; + while (size < numObjects) { + int toAdd = randomIntBetween(10, 20); + int endIndex = Math.min(numObjects, size + toAdd); + responseList.add(ListObjectsV2Response.builder().contents(s3Objects.subList(size, endIndex)).build()); + size = endIndex; + } + int expectedDeletedObjectsCall = numObjects / bulkDeleteSize + (numObjects % bulkDeleteSize > 0 ? 1 : 0); + + final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class); + AtomicInteger counter = new AtomicInteger(); + doAnswer(invocation -> { + Subscriber subscriber = invocation.getArgument(0); + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + int currentCounter = counter.getAndIncrement(); + if (currentCounter < responseList.size()) { + subscriber.onNext(responseList.get(currentCounter)); + } + if (currentCounter == responseList.size()) { + subscriber.onComplete(); + } + } + + @Override + public void cancel() {} + }); + return null; + }).when(listPublisher).subscribe(ArgumentMatchers.>any()); + when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher); + + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn( + CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build()) + ); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference deleteResultRef = new AtomicReference<>(); + blobContainer.deleteAsync(new ActionListener<>() { + @Override + public void onResponse(DeleteResult deleteResult) { + deleteResultRef.set(deleteResult); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + logger.error("exception during deleteAsync", e); + fail("Unexpected failure: " + e.getMessage()); + } + }); + + latch.await(); + + DeleteResult deleteResult = deleteResultRef.get(); + assertEquals(numObjects, deleteResult.blobsDeleted()); + assertEquals(totalSize, deleteResult.bytesDeleted()); + + verify(s3AsyncClient, times(1)).listObjectsV2Paginator(any(ListObjectsV2Request.class)); + verify(s3AsyncClient, times(expectedDeletedObjectsCall)).deleteObjects(any(DeleteObjectsRequest.class)); + } + + public void testDeleteAsyncFailure() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.getBulkDeletesSize()).thenReturn(1000); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create( + s3AsyncClient, + s3AsyncClient, + s3AsyncClient, + null + ); + when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials); + + // Simulate a failure in listObjectsV2Paginator + RuntimeException simulatedFailure = new RuntimeException("Simulated failure"); + when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenThrow(simulatedFailure); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionRef = new AtomicReference<>(); + blobContainer.deleteAsync(new ActionListener<>() { + @Override + public void onResponse(DeleteResult deleteResult) { + fail("Expected a failure, but got a success response"); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }); + + latch.await(); + + assertNotNull(exceptionRef.get()); + assertEquals(IOException.class, exceptionRef.get().getClass()); + assertEquals("Failed to initiate async deletion", exceptionRef.get().getMessage()); + assertEquals(simulatedFailure, exceptionRef.get().getCause()); + } + + public void testDeleteAsyncListingError() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.getBulkDeletesSize()).thenReturn(1000); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create( + s3AsyncClient, + s3AsyncClient, + s3AsyncClient, + null + ); + when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials); + + final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class); + doAnswer(invocation -> { + Subscriber subscriber = invocation.getArgument(0); + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + subscriber.onError(new RuntimeException("Simulated listing error")); + } + + @Override + public void cancel() {} + }); + return null; + }).when(listPublisher).subscribe(ArgumentMatchers.>any()); + when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionRef = new AtomicReference<>(); + blobContainer.deleteAsync(new ActionListener<>() { + @Override + public void onResponse(DeleteResult deleteResult) { + fail("Expected a failure, but got a success response"); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }); + + latch.await(); + + assertNotNull(exceptionRef.get()); + assertEquals(IOException.class, exceptionRef.get().getClass()); + assertEquals("Failed to list objects for deletion", exceptionRef.get().getMessage()); + assertNotNull(exceptionRef.get().getCause()); + assertEquals("Simulated listing error", exceptionRef.get().getCause().getMessage()); + } + + public void testDeleteAsyncDeletionError() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.getBulkDeletesSize()).thenReturn(1000); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create( + s3AsyncClient, + s3AsyncClient, + s3AsyncClient, + null + ); + when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials); + + final ListObjectsV2Publisher listPublisher = mock(ListObjectsV2Publisher.class); + doAnswer(invocation -> { + Subscriber subscriber = invocation.getArgument(0); + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long n) { + subscriber.onNext( + ListObjectsV2Response.builder().contents(S3Object.builder().key("test-key").size(100L).build()).build() + ); + subscriber.onComplete(); + } + + @Override + public void cancel() {} + }); + return null; + }).when(listPublisher).subscribe(ArgumentMatchers.>any()); + when(s3AsyncClient.listObjectsV2Paginator(any(ListObjectsV2Request.class))).thenReturn(listPublisher); + + // Simulate a failure in deleteObjects + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(new RuntimeException("Simulated delete error")); + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(failedFuture); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionRef = new AtomicReference<>(); + blobContainer.deleteAsync(new ActionListener<>() { + @Override + public void onResponse(DeleteResult deleteResult) { + fail("Expected a failure, but got a success response"); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }); + + latch.await(); + + assertNotNull(exceptionRef.get()); + assertEquals(CompletionException.class, exceptionRef.get().getClass()); + assertEquals("java.lang.RuntimeException: Simulated delete error", exceptionRef.get().getMessage()); + assertNotNull(exceptionRef.get().getCause()); + assertEquals("Simulated delete error", exceptionRef.get().getCause().getMessage()); + } + + public void testDeleteBlobsAsyncIgnoringIfNotExists() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + int bulkDeleteSize = randomIntBetween(1, 10); + when(blobStore.getBulkDeletesSize()).thenReturn(bulkDeleteSize); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create( + s3AsyncClient, + s3AsyncClient, + s3AsyncClient, + null + ); + when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials); + + final List blobNames = new ArrayList<>(); + int size = randomIntBetween(10, 100); + for (int i = 0; i < size; i++) { + blobNames.add(randomAlphaOfLength(10)); + } + int expectedDeleteCalls = size / bulkDeleteSize + (size % bulkDeleteSize > 0 ? 1 : 0); + + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn( + CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build()) + ); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionRef = new AtomicReference<>(); + blobContainer.deleteBlobsAsyncIgnoringIfNotExists(blobNames, new ActionListener() { + @Override + public void onResponse(Void aVoid) { + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }); + + latch.await(); + + assertNull(exceptionRef.get()); + + ArgumentCaptor deleteRequestCaptor = ArgumentCaptor.forClass(DeleteObjectsRequest.class); + verify(s3AsyncClient, times(expectedDeleteCalls)).deleteObjects(deleteRequestCaptor.capture()); + + DeleteObjectsRequest capturedRequest = deleteRequestCaptor.getAllValues().stream().findAny().get(); + assertEquals(bucketName, capturedRequest.bucket()); + int totalBlobsDeleted = deleteRequestCaptor.getAllValues() + .stream() + .map(r -> r.delete().objects().size()) + .reduce(Integer::sum) + .get(); + assertEquals(blobNames.size(), totalBlobsDeleted); + List deletedKeys = deleteRequestCaptor.getAllValues() + .stream() + .flatMap(r -> r.delete().objects().stream()) + .map(ObjectIdentifier::key) + .collect(Collectors.toList()); + assertTrue(deletedKeys.containsAll(blobNames)); + } + + public void testDeleteBlobsAsyncIgnoringIfNotExistsFailure() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.getBulkDeletesSize()).thenReturn(1000); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create( + s3AsyncClient, + s3AsyncClient, + s3AsyncClient, + null + ); + when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials); + + // Simulate a failure in deleteObjects + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(new RuntimeException("Simulated delete failure")); + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(failedFuture); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + List blobNames = Arrays.asList("blob1", "blob2", "blob3"); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionRef = new AtomicReference<>(); + blobContainer.deleteBlobsAsyncIgnoringIfNotExists(blobNames, new ActionListener() { + @Override + public void onResponse(Void aVoid) { + fail("Expected a failure, but got a success response"); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }); + + latch.await(); + + assertNotNull(exceptionRef.get()); + assertEquals(IOException.class, exceptionRef.get().getClass()); + assertEquals("Failed to delete blobs " + blobNames, exceptionRef.get().getMessage()); + assertNotNull(exceptionRef.get().getCause()); + assertEquals("java.lang.RuntimeException: Simulated delete failure", exceptionRef.get().getCause().getMessage()); + } + + public void testDeleteBlobsAsyncIgnoringIfNotExistsWithEmptyList() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.getBulkDeletesSize()).thenReturn(1000); + + final S3AsyncClient s3AsyncClient = mock(S3AsyncClient.class); + final AmazonAsyncS3Reference asyncClientReference = mock(AmazonAsyncS3Reference.class); + when(blobStore.asyncClientReference()).thenReturn(asyncClientReference); + AmazonAsyncS3WithCredentials amazonAsyncS3WithCredentials = AmazonAsyncS3WithCredentials.create( + s3AsyncClient, + s3AsyncClient, + s3AsyncClient, + null + ); + when(asyncClientReference.get()).thenReturn(amazonAsyncS3WithCredentials); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + List emptyBlobNames = Collections.emptyList(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean onResponseCalled = new AtomicBoolean(false); + AtomicReference exceptionRef = new AtomicReference<>(); + + blobContainer.deleteBlobsAsyncIgnoringIfNotExists(emptyBlobNames, new ActionListener() { + @Override + public void onResponse(Void aVoid) { + onResponseCalled.set(true); + latch.countDown(); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }); + + latch.await(); + + assertTrue("onResponse should have been called", onResponseCalled.get()); + assertNull("No exception should have been thrown", exceptionRef.get()); + + // Verify that no interactions with S3AsyncClient occurred + verifyNoInteractions(s3AsyncClient); + } + + public void testDeleteBlobsAsyncIgnoringIfNotExistsInitializationFailure() throws Exception { + final String bucketName = randomAlphaOfLengthBetween(1, 10); + final BlobPath blobPath = new BlobPath(); + + final S3BlobStore blobStore = mock(S3BlobStore.class); + when(blobStore.bucket()).thenReturn(bucketName); + when(blobStore.getStatsMetricPublisher()).thenReturn(new StatsMetricPublisher()); + when(blobStore.getBulkDeletesSize()).thenReturn(1000); + + // Simulate a failure when getting the asyncClientReference + RuntimeException simulatedFailure = new RuntimeException("Simulated initialization failure"); + when(blobStore.asyncClientReference()).thenThrow(simulatedFailure); + + final S3BlobContainer blobContainer = new S3BlobContainer(blobPath, blobStore); + + List blobNames = Arrays.asList("blob1", "blob2", "blob3"); + + CountDownLatch latch = new CountDownLatch(1); + AtomicReference exceptionRef = new AtomicReference<>(); + + blobContainer.deleteBlobsAsyncIgnoringIfNotExists(blobNames, new ActionListener() { + @Override + public void onResponse(Void aVoid) { + fail("Expected a failure, but got a success response"); + } + + @Override + public void onFailure(Exception e) { + exceptionRef.set(e); + latch.countDown(); + } + }); + + latch.await(); + + assertNotNull("An exception should have been thrown", exceptionRef.get()); + assertTrue("Exception should be an IOException", exceptionRef.get() instanceof IOException); + assertEquals("Failed to initiate async blob deletion", exceptionRef.get().getMessage()); + assertEquals(simulatedFailure, exceptionRef.get().getCause()); + } + private void mockObjectResponse(S3AsyncClient s3AsyncClient, String bucketName, String blobName, int objectSize) { final InputStream inputStream = new ByteArrayInputStream(randomByteArrayOfLength(objectSize)); diff --git a/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelperTests.java b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelperTests.java new file mode 100644 index 0000000000000..d7f924e05cc70 --- /dev/null +++ b/plugins/repository-s3/src/test/java/org/opensearch/repositories/s3/async/S3AsyncDeleteHelperTests.java @@ -0,0 +1,236 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3.async; + +import software.amazon.awssdk.metrics.MetricPublisher; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; +import software.amazon.awssdk.services.s3.model.S3Error; + +import org.opensearch.repositories.s3.S3BlobStore; +import org.opensearch.repositories.s3.StatsMetricPublisher; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class S3AsyncDeleteHelperTests extends OpenSearchTestCase { + + @Mock + private S3AsyncClient s3AsyncClient; + + @Mock + private S3BlobStore blobStore; + + @Override + public void setUp() throws Exception { + super.setUp(); + MockitoAnnotations.openMocks(this); + } + + public void testExecuteDeleteChain() { + List objectsToDelete = Arrays.asList("key1", "key2", "key3"); + CompletableFuture currentChain = CompletableFuture.completedFuture(null); + + // Mock the deleteObjects method of S3AsyncClient + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn( + CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build()) + ); + + // Mock the getBulkDeletesSize method of S3BlobStore + when(blobStore.getBulkDeletesSize()).thenReturn(2); + + // Mock the getStatsMetricPublisher method of S3BlobStore to return a non-null value + StatsMetricPublisher mockMetricPublisher = mock(StatsMetricPublisher.class); + MetricPublisher mockDeleteObjectsMetricPublisher = mock(MetricPublisher.class); + when(blobStore.getStatsMetricPublisher()).thenReturn(mockMetricPublisher); + when(mockMetricPublisher.getDeleteObjectsMetricPublisher()).thenReturn(mockDeleteObjectsMetricPublisher); + + CompletableFuture newChain = S3AsyncDeleteHelper.executeDeleteChain( + s3AsyncClient, + blobStore, + objectsToDelete, + currentChain, + null + ); + + // Verify that the newChain is completed without any exceptions + assertNotNull(newChain); + assertTrue(newChain.isDone()); + assertFalse(newChain.isCompletedExceptionally()); + + // Verify that the deleteObjects method of S3AsyncClient was called with the expected request + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(DeleteObjectsRequest.class); + verify(s3AsyncClient, times(2)).deleteObjects(requestCaptor.capture()); + + List capturedRequests = requestCaptor.getAllValues(); + assertEquals(2, capturedRequests.size()); + + // Verify that the requests have the expected metric publisher added + for (DeleteObjectsRequest request : capturedRequests) { + assertNotNull(request.overrideConfiguration()); + assertTrue(request.overrideConfiguration().get().metricPublishers().contains(mockDeleteObjectsMetricPublisher)); + } + } + + public void testCreateDeleteBatches() { + List keys = Arrays.asList("key1", "key2", "key3", "key4", "key5", "key6"); + int bulkDeleteSize = 3; + + List> batches = S3AsyncDeleteHelper.createDeleteBatches(keys, bulkDeleteSize); + + assertEquals(2, batches.size()); + assertEquals(Arrays.asList("key1", "key2", "key3"), batches.get(0)); + assertEquals(Arrays.asList("key4", "key5", "key6"), batches.get(1)); + } + + public void testExecuteSingleDeleteBatch() throws Exception { + List batch = Arrays.asList("key1", "key2"); + DeleteObjectsResponse expectedResponse = DeleteObjectsResponse.builder().build(); + + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(CompletableFuture.completedFuture(expectedResponse)); + + // Mock the getStatsMetricPublisher method of S3BlobStore to return a non-null value + StatsMetricPublisher mockMetricPublisher = mock(StatsMetricPublisher.class); + MetricPublisher mockDeleteObjectsMetricPublisher = mock(MetricPublisher.class); + when(blobStore.getStatsMetricPublisher()).thenReturn(mockMetricPublisher); + when(mockMetricPublisher.getDeleteObjectsMetricPublisher()).thenReturn(mockDeleteObjectsMetricPublisher); + + CompletableFuture future = S3AsyncDeleteHelper.executeSingleDeleteBatch(s3AsyncClient, blobStore, batch); + + assertNotNull(future); + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + future.join(); // Wait for the CompletableFuture to complete + + // Verify that the deleteObjects method of S3AsyncClient was called with the expected request + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(DeleteObjectsRequest.class); + verify(s3AsyncClient).deleteObjects(requestCaptor.capture()); + + DeleteObjectsRequest capturedRequest = requestCaptor.getValue(); + assertEquals(blobStore.bucket(), capturedRequest.bucket()); + assertEquals(batch.size(), capturedRequest.delete().objects().size()); + assertTrue(capturedRequest.delete().objects().stream().map(ObjectIdentifier::key).collect(Collectors.toList()).containsAll(batch)); + } + + public void testProcessDeleteResponse() { + DeleteObjectsResponse response = DeleteObjectsResponse.builder() + .errors( + Arrays.asList( + S3Error.builder().key("key1").code("Code1").message("Message1").build(), + S3Error.builder().key("key2").code("Code2").message("Message2").build() + ) + ) + .build(); + + // Call the processDeleteResponse method + S3AsyncDeleteHelper.processDeleteResponse(response); + } + + public void testBulkDelete() { + List blobs = Arrays.asList("key1", "key2", "key3"); + String bucketName = "my-bucket"; + + // Mock the getStatsMetricPublisher method of S3BlobStore to return a non-null value + StatsMetricPublisher mockMetricPublisher = mock(StatsMetricPublisher.class); + MetricPublisher mockDeleteObjectsMetricPublisher = mock(MetricPublisher.class); + when(blobStore.getStatsMetricPublisher()).thenReturn(mockMetricPublisher); + when(mockMetricPublisher.getDeleteObjectsMetricPublisher()).thenReturn(mockDeleteObjectsMetricPublisher); + + DeleteObjectsRequest request = S3AsyncDeleteHelper.bulkDelete(bucketName, blobs, blobStore); + + assertEquals(bucketName, request.bucket()); + assertEquals(blobs.size(), request.delete().objects().size()); + assertTrue(request.delete().objects().stream().map(ObjectIdentifier::key).collect(Collectors.toList()).containsAll(blobs)); + assertNotNull(request.overrideConfiguration()); + assertTrue(request.overrideConfiguration().get().metricPublishers().contains(mockDeleteObjectsMetricPublisher)); + } + + public void testExecuteDeleteBatches() { + List> batches = Arrays.asList(Arrays.asList("key1", "key2"), Arrays.asList("key3", "key4")); + DeleteObjectsResponse expectedResponse = DeleteObjectsResponse.builder().build(); + + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn(CompletableFuture.completedFuture(expectedResponse)); + + // Mock the getStatsMetricPublisher method of S3BlobStore to return a non-null value + StatsMetricPublisher mockMetricPublisher = mock(StatsMetricPublisher.class); + MetricPublisher mockDeleteObjectsMetricPublisher = mock(MetricPublisher.class); + when(blobStore.getStatsMetricPublisher()).thenReturn(mockMetricPublisher); + when(mockMetricPublisher.getDeleteObjectsMetricPublisher()).thenReturn(mockDeleteObjectsMetricPublisher); + + CompletableFuture future = S3AsyncDeleteHelper.executeDeleteBatches(s3AsyncClient, blobStore, batches); + + assertNotNull(future); + assertTrue(future.isDone()); + assertFalse(future.isCompletedExceptionally()); + future.join(); // Wait for the CompletableFuture to complete + + // Verify that the deleteObjects method of S3AsyncClient was called with the expected requests + ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(DeleteObjectsRequest.class); + verify(s3AsyncClient, times(2)).deleteObjects(requestCaptor.capture()); + + List capturedRequests = requestCaptor.getAllValues(); + assertEquals(2, capturedRequests.size()); + for (DeleteObjectsRequest request : capturedRequests) { + assertNotNull(request.overrideConfiguration()); + assertTrue(request.overrideConfiguration().get().metricPublishers().contains(mockDeleteObjectsMetricPublisher)); + } + } + + public void testExecuteDeleteChainWithAfterDeleteAction() { + List objectsToDelete = Arrays.asList("key1", "key2", "key3"); + CompletableFuture currentChain = CompletableFuture.completedFuture(null); + Runnable afterDeleteAction = mock(Runnable.class); + + // Mock the deleteObjects method of S3AsyncClient + when(s3AsyncClient.deleteObjects(any(DeleteObjectsRequest.class))).thenReturn( + CompletableFuture.completedFuture(DeleteObjectsResponse.builder().build()) + ); + + // Mock the getBulkDeletesSize method of S3BlobStore + when(blobStore.getBulkDeletesSize()).thenReturn(2); + + // Mock the getStatsMetricPublisher method of S3BlobStore to return a non-null value + StatsMetricPublisher mockMetricPublisher = mock(StatsMetricPublisher.class); + MetricPublisher mockDeleteObjectsMetricPublisher = mock(MetricPublisher.class); + when(blobStore.getStatsMetricPublisher()).thenReturn(mockMetricPublisher); + when(mockMetricPublisher.getDeleteObjectsMetricPublisher()).thenReturn(mockDeleteObjectsMetricPublisher); + + CompletableFuture newChain = S3AsyncDeleteHelper.executeDeleteChain( + s3AsyncClient, + blobStore, + objectsToDelete, + currentChain, + afterDeleteAction + ); + + // Verify that the newChain is completed without any exceptions + assertNotNull(newChain); + assertTrue(newChain.isDone()); + assertFalse(newChain.isCompletedExceptionally()); + + // Verify that the afterDeleteAction was called + verify(afterDeleteAction).run(); + } + +} diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java index d45b4e3deb798..875f11203281a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/multipart/mocks/MockFsAsyncBlobContainer.java @@ -1,3 +1,4 @@ + /* * SPDX-License-Identifier: Apache-2.0 * @@ -12,6 +13,7 @@ import org.opensearch.common.StreamContext; import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.blobstore.stream.read.ReadContext; @@ -146,4 +148,14 @@ public boolean remoteIntegrityCheckSupported() { private boolean isSegmentFile(String filename) { return !filename.endsWith(".tlog") && !filename.endsWith(".ckp"); } + + @Override + public void deleteAsync(ActionListener completionListener) { + throw new UnsupportedOperationException("deleteAsync"); + } + + @Override + public void deleteBlobsAsyncIgnoringIfNotExists(List blobNames, ActionListener completionListener) { + throw new UnsupportedOperationException("deleteBlobsAsyncIgnoringIfNotExists"); + } } diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java index 97f304d776f5c..b769cdc2fe7ab 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamBlobContainer.java @@ -14,6 +14,7 @@ import org.opensearch.core.action.ActionListener; import java.io.IOException; +import java.util.List; /** * An extension of {@link BlobContainer} that adds {@link AsyncMultiStreamBlobContainer#asyncBlobUpload} to allow @@ -48,4 +49,8 @@ public interface AsyncMultiStreamBlobContainer extends BlobContainer { * by underlying blobContainer. In this case, caller doesn't need to ensure integrity of data. */ boolean remoteIntegrityCheckSupported(); + + void deleteAsync(ActionListener completionListener); + + void deleteBlobsAsyncIgnoringIfNotExists(List blobNames, ActionListener completionListener); } diff --git a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java index 82bc7a0baed50..286c01f9dca44 100644 --- a/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java +++ b/server/src/main/java/org/opensearch/common/blobstore/AsyncMultiStreamEncryptedBlobContainer.java @@ -171,4 +171,14 @@ private InputStreamContainer decryptInputStreamContainer(InputStreamContainer in return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos); } } + + @Override + public void deleteAsync(ActionListener completionListener) { + blobContainer.deleteAsync(completionListener); + } + + @Override + public void deleteBlobsAsyncIgnoringIfNotExists(List blobNames, ActionListener completionListener) { + blobContainer.deleteBlobsAsyncIgnoringIfNotExists(blobNames, completionListener); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index f946cb6bc77f6..3ef063b03d738 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -781,7 +781,10 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED, RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX, RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX, + + // Snapshot related Settings BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING, + BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, // Composite index settings CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING, diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 2dde651a7a042..3d066e4d70990 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -50,6 +50,7 @@ import org.opensearch.action.ActionRunnable; import org.opensearch.action.StepListener; import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; import org.opensearch.cluster.RepositoryCleanupInProgress; @@ -69,6 +70,7 @@ import org.opensearch.common.Randomness; import org.opensearch.common.SetOnce; import org.opensearch.common.UUIDs; +import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer; import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; @@ -181,6 +183,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -354,6 +357,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp Setting.Property.Final ); + /** + * Controls the fixed prefix for the snapshot shard blob path. cluster.snapshot.async-deletion.enable + */ + public static final Setting SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING = Setting.boolSetting( + "cluster.snapshot.async-deletion.enable", + true, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + protected volatile boolean supportURLRepo; private volatile int maxShardBlobDeleteBatch; @@ -447,6 +460,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final String snapshotShardPathPrefix; + private volatile boolean enableAsyncDeletion; + /** * Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for * {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart @@ -502,6 +517,8 @@ protected BlobStoreRepository( this.recoverySettings = recoverySettings; this.remoteStoreSettings = new RemoteStoreSettings(clusterService.getSettings(), clusterService.getClusterSettings()); this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings()); + this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings()); + clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion); } @Override @@ -2156,7 +2173,7 @@ private void executeOneStaleIndexDelete( } // Finally, we delete the [base_path]/indexId folder - deleteResult = deleteResult.add(indexEntry.getValue().delete()); // Deleting the index folder + deleteResult = deleteResult.add(deleteContainer(indexEntry.getValue())); // Deleting the index folder logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); return deleteResult; } catch (IOException e) { @@ -2189,6 +2206,21 @@ private void executeOneStaleIndexDelete( })); } + private DeleteResult deleteContainer(BlobContainer container) throws IOException { + long startTime = System.nanoTime(); + DeleteResult deleteResult; + if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) { + // Use deleteAsync and wait for the result + PlainActionFuture future = new PlainActionFuture<>(); + ((AsyncMultiStreamBlobContainer) container).deleteAsync(future); + deleteResult = future.actionGet(); + } else { + deleteResult = container.delete(); + } + logger.debug(new ParameterizedMessage("[{}] Deleted {} in {}ns", metadata.name(), container.path(), startTime - System.nanoTime())); + return deleteResult; + } + /** * Cleans up the remote store directory if needed. *

This method cleans up segments in the remote store directory for deleted indices. @@ -2392,7 +2424,7 @@ void releaseRemoteStoreLocksAndCleanup( * @return A DeleteResult object representing the result of the deletion operation. * @throws IOException If an I/O error occurs during the deletion process. */ - private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException { + private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException, ExecutionException, InterruptedException { // If the provided ShardInfo is null, return a zero DeleteResult if (shardInfo == null) { return DeleteResult.ZERO; @@ -2404,7 +2436,7 @@ private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException { // Iterate over the shards and delete each shard's data for (int i = 0; i < shardInfo.getShardCount(); i++) { // Call the delete method on the shardContainer and accumulate the result - deleteResult = deleteResult.add(shardContainer(shardInfo.getIndexId(), i).delete()); + deleteResult = deleteResult.add(deleteContainer(shardContainer(shardInfo.getIndexId(), i))); } // Return the accumulated DeleteResult @@ -2813,7 +2845,23 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna private void deleteFromContainer(BlobContainer container, List blobs) throws IOException { logger.trace(() -> new ParameterizedMessage("[{}] Deleting {} from [{}]", metadata.name(), blobs, container.path())); - container.deleteBlobsIgnoringIfNotExists(blobs); + long startTime = System.nanoTime(); + if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) { + PlainActionFuture future = new PlainActionFuture<>(); + ((AsyncMultiStreamBlobContainer) container).deleteBlobsAsyncIgnoringIfNotExists(blobs, future); + future.actionGet(); + } else { + container.deleteBlobsIgnoringIfNotExists(blobs); + } + logger.debug( + () -> new ParameterizedMessage( + "[{}] Deletion {} from [{}] took {}ns", + metadata.name(), + blobs, + container.path(), + System.nanoTime() - startTime + ) + ); } private BlobPath indicesPath() { @@ -4718,4 +4766,8 @@ public String toString() { return name; } } + + public void setEnableAsyncDeletion(boolean enableAsyncDeletion) { + this.enableAsyncDeletion = enableAsyncDeletion; + } } diff --git a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java index 10e4cc6cfb1ef..fddc6c0c94005 100644 --- a/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java +++ b/server/src/test/java/org/opensearch/index/translog/transfer/BlobStoreTransferServiceTests.java @@ -17,6 +17,7 @@ import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.blobstore.stream.read.ReadContext; @@ -51,6 +52,7 @@ import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.Base64; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -340,5 +342,15 @@ public boolean remoteIntegrityCheckSupported() { public BlobContainer getDelegate() { return delegate; } + + @Override + public void deleteBlobsAsyncIgnoringIfNotExists(List blobNames, ActionListener completionListener) { + throw new RuntimeException("deleteBlobsAsyncIgnoringIfNotExists not supported"); + } + + @Override + public void deleteAsync(ActionListener completionListener) { + throw new RuntimeException("deleteAsync not supported"); + } } } diff --git a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java index c5f36fcc01983..d9746e883d6df 100644 --- a/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/opensearch/snapshots/BlobStoreFormatTests.java @@ -39,6 +39,7 @@ import org.opensearch.common.blobstore.BlobMetadata; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.blobstore.DeleteResult; import org.opensearch.common.blobstore.fs.FsBlobContainer; import org.opensearch.common.blobstore.fs.FsBlobStore; import org.opensearch.common.blobstore.stream.read.ReadContext; @@ -63,6 +64,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Path; +import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -335,5 +337,15 @@ public boolean remoteIntegrityCheckSupported() { public BlobContainer getDelegate() { return delegate; } + + @Override + public void deleteAsync(ActionListener completionListener) { + throw new RuntimeException("deleteAsync not supported"); + } + + @Override + public void deleteBlobsAsyncIgnoringIfNotExists(List blobNames, ActionListener completionListener) { + throw new RuntimeException("deleteBlobsAsyncIgnoringIfNotExists not supported"); + } } }