Skip to content

Commit

Permalink
Use async client for delete blob or path in S3 Blob Container
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Dec 16, 2024
1 parent e46d4bc commit 5bbc7ff
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
// Disable request throttling because some random values in tests might generate too many failures for the S3 client
.put(S3ClientSettings.USE_THROTTLE_RETRIES_SETTING.getConcreteSettingForNamespace("test").getKey(), false)
.put(S3ClientSettings.PROXY_TYPE_SETTING.getConcreteSettingForNamespace("test").getKey(), ProxySettings.ProxyType.DIRECT)
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
.put(super.nodeSettings(nodeOrdinal))
.setSecureSettings(secureSettings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,7 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes

@Override
protected Settings nodeSettings() {
return Settings.builder()
.put(super.nodeSettings())
.put(BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.getKey(), false)
.build();
return Settings.builder().put(super.nodeSettings()).build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@
import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesRequest;
import software.amazon.awssdk.services.s3.model.GetObjectAttributesResponse;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
Expand All @@ -55,9 +52,7 @@
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.ObjectAttributes;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Error;
import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
Expand All @@ -68,7 +63,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.common.Nullable;
import org.opensearch.common.SetOnce;
import org.opensearch.common.StreamContext;
Expand Down Expand Up @@ -101,11 +96,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
Expand Down Expand Up @@ -381,125 +373,17 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
}

@Override
public DeleteResult delete() throws IOException {
final AtomicLong deletedBlobs = new AtomicLong();
final AtomicLong deletedBytes = new AtomicLong();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
ListObjectsV2Iterable listObjectsIterable = SocketAccess.doPrivileged(
() -> clientReference.get()
.listObjectsV2Paginator(
ListObjectsV2Request.builder()
.bucket(blobStore.bucket())
.prefix(keyPath)
.overrideConfiguration(
o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().listObjectsMetricPublisher)
)
.build()
)
);

Iterator<ListObjectsV2Response> listObjectsResponseIterator = listObjectsIterable.iterator();
while (listObjectsResponseIterator.hasNext()) {
ListObjectsV2Response listObjectsResponse = SocketAccess.doPrivileged(listObjectsResponseIterator::next);
List<String> blobsToDelete = listObjectsResponse.contents().stream().map(s3Object -> {
deletedBlobs.incrementAndGet();
deletedBytes.addAndGet(s3Object.size());

return s3Object.key();
}).collect(Collectors.toList());

if (!listObjectsResponseIterator.hasNext()) {
blobsToDelete.add(keyPath);
}

doDeleteBlobs(blobsToDelete, false);
}
} catch (SdkException e) {
throw new IOException("Exception when deleting blob container [" + keyPath + "]", e);
}

return new DeleteResult(deletedBlobs.get(), deletedBytes.get());
public DeleteResult delete() {
PlainActionFuture<DeleteResult> future = new PlainActionFuture<>();
deleteAsync(future);
return future.actionGet();
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
doDeleteBlobs(blobNames, true);
}

private void doDeleteBlobs(List<String> blobNames, boolean relative) throws IOException {
if (blobNames.isEmpty()) {
return;
}
final Set<String> outstanding;
if (relative) {
outstanding = blobNames.stream().map(this::buildKey).collect(Collectors.toSet());
} else {
outstanding = new HashSet<>(blobNames);
}
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
// S3 API allows 1k blobs per delete so we split up the given blobs into requests of bulk size deletes
final List<DeleteObjectsRequest> deleteRequests = new ArrayList<>();
final List<String> partition = new ArrayList<>();
for (String key : outstanding) {
partition.add(key);
if (partition.size() == blobStore.getBulkDeletesSize()) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
partition.clear();
}
}
if (partition.isEmpty() == false) {
deleteRequests.add(bulkDelete(blobStore.bucket(), partition));
}
SocketAccess.doPrivilegedVoid(() -> {
SdkException aex = null;
for (DeleteObjectsRequest deleteRequest : deleteRequests) {
List<String> keysInRequest = deleteRequest.delete()
.objects()
.stream()
.map(ObjectIdentifier::key)
.collect(Collectors.toList());
try {
DeleteObjectsResponse deleteObjectsResponse = clientReference.get().deleteObjects(deleteRequest);
outstanding.removeAll(keysInRequest);
outstanding.addAll(deleteObjectsResponse.errors().stream().map(S3Error::key).collect(Collectors.toSet()));
if (!deleteObjectsResponse.errors().isEmpty()) {
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
);
}
} catch (SdkException e) {
// The AWS client threw any unexpected exception and did not execute the request at all so we do not
// remove any keys from the outstanding deletes set.
aex = ExceptionsHelper.useOrSuppress(aex, e);
}
}
if (aex != null) {
throw aex;
}
});
} catch (Exception e) {
throw new IOException("Failed to delete blobs [" + outstanding + "]", e);
}
assert outstanding.isEmpty();
}

private DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Delete.builder()
.objects(blobs.stream().map(blob -> ObjectIdentifier.builder().key(blob).build()).collect(Collectors.toList()))
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
.build();
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) {
PlainActionFuture<Void> future = new PlainActionFuture<>();
deleteBlobsAsyncIgnoringIfNotExists(blobNames, future);
future.actionGet();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,6 @@ public void apply(Settings value, Settings current, Settings previous) {

// Snapshot related Settings
BlobStoreRepository.SNAPSHOT_SHARD_PATH_PREFIX_SETTING,
BlobStoreRepository.SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING,
BlobStoreRepository.SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD,

SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.RepositoryCleanupInProgress;
Expand All @@ -70,7 +69,6 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -428,16 +426,6 @@ protected static long calculateMaxWithinIntLimit(long defaultThresholdOfHeap, lo
Setting.Property.Final
);

/**
* Controls the fixed prefix for the snapshot shard blob path. cluster.snapshot.async-deletion.enable
*/
public static final Setting<Boolean> SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING = Setting.boolSetting(
"cluster.snapshot.async-deletion.enable",
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

protected volatile boolean supportURLRepo;

private volatile int maxShardBlobDeleteBatch;
Expand Down Expand Up @@ -531,8 +519,6 @@ protected static long calculateMaxWithinIntLimit(long defaultThresholdOfHeap, lo

private final String snapshotShardPathPrefix;

private volatile boolean enableAsyncDeletion;

protected final long repositoryDataCacheThreshold;

/**
Expand Down Expand Up @@ -587,8 +573,6 @@ protected BlobStoreRepository(
this.recoverySettings = recoverySettings;
this.remoteStoreSettings = new RemoteStoreSettings(clusterService.getSettings(), clusterService.getClusterSettings());
this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings());
this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion);
this.repositoryDataCacheThreshold = SNAPSHOT_REPOSITORY_DATA_CACHE_THRESHOLD.get(clusterService.getSettings()).getBytes();
}

Expand Down Expand Up @@ -2219,15 +2203,7 @@ private void executeOneStaleIndexDelete(

private DeleteResult deleteContainer(BlobContainer container) throws IOException {
long startTime = System.nanoTime();
DeleteResult deleteResult;
if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) {
// Use deleteAsync and wait for the result
PlainActionFuture<DeleteResult> future = new PlainActionFuture<>();
((AsyncMultiStreamBlobContainer) container).deleteAsync(future);
deleteResult = future.actionGet();
} else {
deleteResult = container.delete();
}
DeleteResult deleteResult = container.delete();
logger.debug(new ParameterizedMessage("[{}] Deleted {} in {}ns", metadata.name(), container.path(), startTime - System.nanoTime()));
return deleteResult;
}
Expand Down Expand Up @@ -2862,13 +2838,7 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna
private void deleteFromContainer(BlobContainer container, List<String> blobs) throws IOException {
logger.trace(() -> new ParameterizedMessage("[{}] Deleting {} from [{}]", metadata.name(), blobs, container.path()));
long startTime = System.nanoTime();
if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) {
PlainActionFuture<Void> future = new PlainActionFuture<>();
((AsyncMultiStreamBlobContainer) container).deleteBlobsAsyncIgnoringIfNotExists(blobs, future);
future.actionGet();
} else {
container.deleteBlobsIgnoringIfNotExists(blobs);
}
container.deleteBlobsIgnoringIfNotExists(blobs);
logger.debug(
() -> new ParameterizedMessage(
"[{}] Deletion {} from [{}] took {}ns",
Expand Down Expand Up @@ -4742,8 +4712,4 @@ public String toString() {
return name;
}
}

public void setEnableAsyncDeletion(boolean enableAsyncDeletion) {
this.enableAsyncDeletion = enableAsyncDeletion;
}
}

0 comments on commit 5bbc7ff

Please sign in to comment.