Skip to content

Commit

Permalink
Optimizations in s3 async upload flow
Browse files Browse the repository at this point in the history
Signed-off-by: vikasvb90 <[email protected]>
  • Loading branch information
vikasvb90 committed Nov 27, 2023
1 parent cdbbfcc commit a1ebc61
Show file tree
Hide file tree
Showing 11 changed files with 263 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private static IrsaCredentials buildFromEnvironment(IrsaCredentials defaults) {
return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName);
}

private synchronized void releaseCachedClients() {
public synchronized void releaseCachedClients() {
// the clients will shutdown when they will not be used anymore
for (final AmazonAsyncS3Reference clientReference : clientsCache.values()) {
clientReference.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;

import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -188,10 +189,38 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
writeContext.getWritePriority(),
writeContext.getUploadFinalizer(),
writeContext.doRemoteDataIntegrityCheck(),
writeContext.getExpectedChecksum()
writeContext.getExpectedChecksum(),
blobStore.isUploadRetryEnabled()
);
try {
long partSize = blobStore.getAsyncTransferManager().calculateOptimalPartSize(writeContext.getFileSize());
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
StreamContext streamContext = SocketAccess.doPrivileged(
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
);
InputStreamContainer inputStream = streamContext.provideStream(0);
try {
executeMultipartUpload(
blobStore,
uploadRequest.getKey(),
inputStream.getInputStream(),
uploadRequest.getContentLength()
);
completionListener.onResponse(null);
} catch (Exception ex) {
logger.error(
() -> new ParameterizedMessage(
"Failed to upload large file {} of size {} ",
uploadRequest.getKey(),
uploadRequest.getContentLength()
),
ex
);
completionListener.onFailure(ex);
}
return;
}
long partSize = blobStore.getAsyncTransferManager()
.calculateOptimalPartSize(writeContext.getFileSize(), writeContext.getWritePriority(), blobStore.isUploadRetryEnabled());
StreamContext streamContext = SocketAccess.doPrivileged(() -> writeContext.getStreamProvider(partSize));
try (AmazonAsyncS3Reference amazonS3Reference = SocketAccess.doPrivileged(blobStore::asyncClientReference)) {

Expand Down Expand Up @@ -537,8 +566,14 @@ void executeSingleUpload(final S3BlobStore blobStore, final String blobName, fin

PutObjectRequest putObjectRequest = putObjectRequestBuilder.build();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
final InputStream requestInputStream;
if (blobStore.isUploadRetryEnabled()) {
requestInputStream = new BufferedInputStream(input, (int) (blobSize + 1));
} else {
requestInputStream = input;
}
SocketAccess.doPrivilegedVoid(
() -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(input, blobSize))
() -> clientReference.get().putObject(putObjectRequest, RequestBody.fromInputStream(requestInputStream, blobSize))
);
} catch (final SdkException e) {
throw new IOException("Unable to upload object [" + blobName + "] using a single upload", e);
Expand Down Expand Up @@ -578,6 +613,13 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
createMultipartUploadRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
}

final InputStream requestInputStream;
if (blobStore.isUploadRetryEnabled()) {
requestInputStream = new BufferedInputStream(input, (int) (partSize + 1));
} else {
requestInputStream = input;
}

CreateMultipartUploadRequest createMultipartUploadRequest = createMultipartUploadRequestBuilder.build();
try (AmazonS3Reference clientReference = blobStore.clientReference()) {
uploadId.set(
Expand All @@ -601,10 +643,9 @@ void executeMultipartUpload(final S3BlobStore blobStore, final String blobName,
.build();

bytesCount += uploadPartRequest.contentLength();

final UploadPartResponse uploadResponse = SocketAccess.doPrivileged(
() -> clientReference.get()
.uploadPart(uploadPartRequest, RequestBody.fromInputStream(input, uploadPartRequest.contentLength()))
.uploadPart(uploadPartRequest, RequestBody.fromInputStream(requestInputStream, uploadPartRequest.contentLength()))
);
parts.add(CompletedPart.builder().partNumber(uploadPartRequest.partNumber()).eTag(uploadResponse.eTag()).build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@
import static org.opensearch.repositories.s3.S3Repository.BUFFER_SIZE_SETTING;
import static org.opensearch.repositories.s3.S3Repository.BULK_DELETE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.CANNED_ACL_SETTING;
import static org.opensearch.repositories.s3.S3Repository.REDIRECT_LARGE_S3_UPLOAD;
import static org.opensearch.repositories.s3.S3Repository.SERVER_SIDE_ENCRYPTION_SETTING;
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED;

class S3BlobStore implements BlobStore {

Expand All @@ -71,6 +73,10 @@ class S3BlobStore implements BlobStore {

private volatile ByteSizeValue bufferSize;

private volatile boolean redirectLargeUploads;

private volatile boolean uploadRetryEnabled;

private volatile boolean serverSideEncryption;

private volatile ObjectCannedACL cannedACL;
Expand Down Expand Up @@ -119,6 +125,9 @@ class S3BlobStore implements BlobStore {
this.normalExecutorBuilder = normalExecutorBuilder;
this.priorityExecutorBuilder = priorityExecutorBuilder;
this.urgentExecutorBuilder = urgentExecutorBuilder;
// Settings to initialize blobstore with.
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
}

@Override
Expand All @@ -130,6 +139,8 @@ public void reload(RepositoryMetadata repositoryMetadata) {
this.cannedACL = initCannedACL(CANNED_ACL_SETTING.get(repositoryMetadata.settings()));
this.storageClass = initStorageClass(STORAGE_CLASS_SETTING.get(repositoryMetadata.settings()));
this.bulkDeletesSize = BULK_DELETE_SIZE.get(repositoryMetadata.settings());
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());

Check warning on line 143 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L142-L143

Added lines #L142 - L143 were not covered by tests
}

@Override
Expand All @@ -149,6 +160,14 @@ int getMaxRetries() {
return service.settings(repositoryMetadata).maxRetries;
}

public boolean isRedirectLargeUploads() {
return redirectLargeUploads;

Check warning on line 164 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobStore.java#L164

Added line #L164 was not covered by tests
}

public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}

public String bucket() {
return bucket;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,20 @@ class S3Repository extends MeteredBlobStoreRepository {
*/
static final ByteSizeValue MAX_FILE_SIZE_USING_MULTIPART = new ByteSizeValue(5, ByteSizeUnit.TB);

/**
* Whether large uploads need to be redirected to slow sync s3 client.
*/
static final Setting<Boolean> REDIRECT_LARGE_S3_UPLOAD = Setting.boolSetting(
"redirect_large_s3_upload",
true,
Setting.Property.NodeScope
);

/**
* Whether retry on uploads are enabled. This setting wraps inputstream with buffered stream to enable retries.
*/
static final Setting<Boolean> UPLOAD_RETRY_ENABLED = Setting.boolSetting("s3_upload_retry_enabled", true, Setting.Property.NodeScope);

/**
* Minimum threshold below which the chunk is uploaded using a single request. Beyond this threshold,
* the S3 repository will use the AWS Multipart Upload API to split the chunk into several parts, each of buffer_size length, and
Expand Down Expand Up @@ -391,7 +405,9 @@ public void reload(RepositoryMetadata newRepositoryMetadata) {

// Reload configs for S3RepositoryPlugin
service.settings(metadata);
service.releaseCachedClients();

Check warning on line 408 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java#L408

Added line #L408 was not covered by tests
s3AsyncService.settings(metadata);
s3AsyncService.releaseCachedClients();

Check warning on line 410 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3Repository.java#L410

Added line #L410 was not covered by tests

// Reload configs for S3BlobStore
BlobStore blobStore = getBlobStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ public List<Setting<?>> getSettings() {
S3ClientSettings.IDENTITY_TOKEN_FILE_SETTING,
S3ClientSettings.ROLE_SESSION_NAME_SETTING,
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING,
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING,
S3Repository.REDIRECT_LARGE_S3_UPLOAD,
S3Repository.UPLOAD_RETRY_ENABLED
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ private static IrsaCredentials buildFromEnviroment(IrsaCredentials defaults) {
return new IrsaCredentials(webIdentityTokenFile, roleArn, roleSessionName);
}

private synchronized void releaseCachedClients() {
public synchronized void releaseCachedClients() {
// the clients will shutdown when they will not be used anymore
for (final AmazonS3Reference clientReference : clientsCache.values()) {
clientReference.decRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.stream.write.WritePriority;
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;
Expand Down Expand Up @@ -55,8 +54,8 @@ public class AsyncPartsHandler {
* @param uploadId Upload Id against which multi-part is being performed
* @param completedParts Reference of completed parts
* @param inputStreamContainers Checksum containers
* @return list of completable futures
* @param statsMetricPublisher sdk metric publisher
* @return list of completable futures
* @throws IOException thrown in case of an IO error
*/
public static List<CompletableFuture<CompletedPart>> uploadParts(
Expand All @@ -69,7 +68,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
String uploadId,
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
StatsMetricPublisher statsMetricPublisher
StatsMetricPublisher statsMetricPublisher,
boolean uploadRetryEnabled
) throws IOException {
List<CompletableFuture<CompletedPart>> futures = new ArrayList<>();
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
Expand All @@ -95,7 +95,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
futures,
uploadPartRequestBuilder.build(),
inputStreamContainer,
uploadRequest
uploadRequest,
uploadRetryEnabled
);
}

Expand Down Expand Up @@ -132,6 +133,18 @@ public static void cleanUpParts(S3AsyncClient s3AsyncClient, UploadRequest uploa
}));
}

public static InputStream maybeRetryInputStream(
InputStream inputStream,
WritePriority writePriority,
boolean uploadRetryEnabled,
long contentLength
) {
if (uploadRetryEnabled == true && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) {
return new BufferedInputStream(inputStream, (int) (contentLength + 1));
}
return inputStream;
}

private static void uploadPart(
S3AsyncClient s3AsyncClient,
ExecutorService executorService,
Expand All @@ -142,7 +155,8 @@ private static void uploadPart(
List<CompletableFuture<CompletedPart>> futures,
UploadPartRequest uploadPartRequest,
InputStreamContainer inputStreamContainer,
UploadRequest uploadRequest
UploadRequest uploadRequest,
boolean uploadRetryEnabled
) {
Integer partNumber = uploadPartRequest.partNumber();

Expand All @@ -154,9 +168,13 @@ private static void uploadPart(
} else {
streamReadExecutor = executorService;
}
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));

InputStream inputStream = maybeRetryInputStream(
inputStreamContainer.getInputStream(),
uploadRequest.getWritePriority(),
uploadRetryEnabled,
uploadPartRequest.contentLength()
);
CompletableFuture<UploadPartResponse> uploadPartResponseFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.uploadPart(
uploadPartRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.common.util.ByteUtils;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
Expand Down Expand Up @@ -183,7 +183,8 @@ private void doUploadInParts(
uploadId,
completedParts,
inputStreamContainers,
statsMetricPublisher
statsMetricPublisher,
uploadRequest.isUploadRetryEnabled()
);
} catch (Exception ex) {
try {
Expand Down Expand Up @@ -302,10 +303,13 @@ private static void handleException(CompletableFuture<Void> returnFuture, Suppli
/**
* Calculates the optimal part size of each part request if the upload operation is carried out as multipart upload.
*/
public long calculateOptimalPartSize(long contentLengthOfSource) {
public long calculateOptimalPartSize(long contentLengthOfSource, WritePriority writePriority, boolean uploadRetryEnabled) {
if (contentLengthOfSource < ByteSizeUnit.MB.toBytes(5)) {
return contentLengthOfSource;
}
if (uploadRetryEnabled && (writePriority == WritePriority.HIGH || writePriority == WritePriority.URGENT)) {
return new ByteSizeValue(5, ByteSizeUnit.MB).getBytes();

Check warning on line 311 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/async/AsyncTransferManager.java#L311

Added line #L311 was not covered by tests
}
double optimalPartSize = contentLengthOfSource / (double) MAX_UPLOAD_PARTS;
optimalPartSize = Math.ceil(optimalPartSize);
return (long) Math.max(optimalPartSize, minimumPartSize);
Expand Down Expand Up @@ -335,9 +339,13 @@ private void uploadInOneChunk(
} else {
streamReadExecutor = executorService;
}
// Buffered stream is needed to allow mark and reset ops during IO errors so that only buffered
// data can be retried instead of retrying whole file by the application.
InputStream inputStream = new BufferedInputStream(inputStreamContainer.getInputStream(), (int) (ByteSizeUnit.MB.toBytes(1) + 1));

InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream(
inputStreamContainer.getInputStream(),
uploadRequest.getWritePriority(),
uploadRequest.isUploadRetryEnabled(),
uploadRequest.getContentLength()
);
CompletableFuture<Void> putObjectFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.putObject(
putObjectRequestBuilder.build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ public class UploadRequest {
private final boolean doRemoteDataIntegrityCheck;
private final Long expectedChecksum;

private boolean uploadRetryEnabled;

/**
* Construct a new UploadRequest object
*
Expand All @@ -43,7 +45,8 @@ public UploadRequest(
WritePriority writePriority,
CheckedConsumer<Boolean, IOException> uploadFinalizer,
boolean doRemoteDataIntegrityCheck,
Long expectedChecksum
Long expectedChecksum,
boolean uploadRetryEnabled
) {
this.bucket = bucket;
this.key = key;
Expand All @@ -52,6 +55,7 @@ public UploadRequest(
this.uploadFinalizer = uploadFinalizer;
this.doRemoteDataIntegrityCheck = doRemoteDataIntegrityCheck;
this.expectedChecksum = expectedChecksum;
this.uploadRetryEnabled = uploadRetryEnabled;
}

public String getBucket() {
Expand Down Expand Up @@ -81,4 +85,8 @@ public boolean doRemoteDataIntegrityCheck() {
public Long getExpectedChecksum() {
return expectedChecksum;
}

public boolean isUploadRetryEnabled() {
return uploadRetryEnabled;
}
}
Loading

0 comments on commit a1ebc61

Please sign in to comment.