Skip to content

Commit

Permalink
Permit backed futures to prevent timeouts during upload bursts
Browse files Browse the repository at this point in the history
Signed-off-by: vikasvb90 <[email protected]>
  • Loading branch information
vikasvb90 authored and Bhumika Saini committed May 2, 2024
1 parent ef841dd commit 72cab64
Show file tree
Hide file tree
Showing 15 changed files with 1,054 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,22 @@ class S3Repository extends MeteredBlobStoreRepository {
Setting.Property.NodeScope
);

/**
* Number of retries in case of a transfer failure.
*/
public static Setting<Integer> S3_MAX_TRANSFER_RETRIES = Setting.intSetting("s3_max_transfer_retries", 3, Setting.Property.NodeScope);

/**
* Percentage of total available permits to be available for priority transfers.
*/
public static Setting<Integer> S3_PRIORITY_PERMIT_ALLOCATION_PERCENT = Setting.intSetting(
"s3_priority_permit_alloc_perc",
70,
21,
80,
Setting.Property.NodeScope
);

/**
* Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
import org.opensearch.repositories.s3.async.AsyncTransferEventLoopGroup;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.repositories.s3.async.PermitBackedRetryableFutureUtils;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.FixedExecutorBuilder;
Expand All @@ -69,6 +70,8 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;

/**
Expand All @@ -81,6 +84,7 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private static final String PRIORITY_FUTURE_COMPLETION = "priority_future_completion";
private static final String PRIORITY_STREAM_READER = "priority_stream_reader";
private static final String FUTURE_COMPLETION = "future_completion";
private static final String REMOTE_TRANSFER_RETRY = "remote_transfer_retry";
private static final String STREAM_READER = "stream_reader";

protected final S3Service service;
Expand All @@ -91,6 +95,8 @@ public class S3RepositoryPlugin extends Plugin implements RepositoryPlugin, Relo
private AsyncExecutorContainer urgentExecutorBuilder;
private AsyncExecutorContainer priorityExecutorBuilder;
private AsyncExecutorContainer normalExecutorBuilder;
private ExecutorService remoteTransferRetryPool;
private ScheduledExecutorService scheduler;

public S3RepositoryPlugin(final Settings settings, final Path configPath) {
this(settings, configPath, new S3Service(configPath), new S3AsyncService(configPath));
Expand Down Expand Up @@ -120,6 +126,14 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
TimeValue.timeValueMinutes(5)
)
);
executorBuilders.add(
new ScalingExecutorBuilder(
REMOTE_TRANSFER_RETRY,
allocatedProcessors(settings),
allocatedProcessors(settings) * 2,
TimeValue.timeValueMinutes(5)
)
);
return executorBuilders;
}

Expand Down Expand Up @@ -189,6 +203,8 @@ public Collection<Object> createComponents(
threadPool.executor(STREAM_READER),
new AsyncTransferEventLoopGroup(normalEventLoopThreads)
);
this.remoteTransferRetryPool = threadPool.executor(REMOTE_TRANSFER_RETRY);
this.scheduler = threadPool.scheduler();
return Collections.emptyList();
}

Expand All @@ -204,7 +220,16 @@ protected S3Repository createRepository(
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING.get(clusterService.getSettings()).getBytes(),
normalExecutorBuilder.getStreamReader(),
priorityExecutorBuilder.getStreamReader(),
urgentExecutorBuilder.getStreamReader()
urgentExecutorBuilder.getStreamReader(),
new PermitBackedRetryableFutureUtils<>(
S3Repository.S3_MAX_TRANSFER_RETRIES.get(clusterService.getSettings()),
// High permit allocation because each op acquiring permit performs disk IO, computation and network IO.
Math.max(allocatedProcessors(clusterService.getSettings()) * 5, 10),
((double) S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT.get(clusterService.getSettings())) / 100,
remoteTransferRetryPool,
scheduler
)

);
return new S3Repository(
metadata,
Expand Down Expand Up @@ -263,7 +288,9 @@ public List<Setting<?>> getSettings() {
S3Repository.PARALLEL_MULTIPART_UPLOAD_MINIMUM_PART_SIZE_SETTING,
S3Repository.PARALLEL_MULTIPART_UPLOAD_ENABLED_SETTING,
S3Repository.REDIRECT_LARGE_S3_UPLOAD,
S3Repository.UPLOAD_RETRY_ENABLED
S3Repository.UPLOAD_RETRY_ENABLED,
S3Repository.S3_MAX_TRANSFER_RETRIES,
S3Repository.S3_PRIORITY_PERMIT_ALLOCATION_PERCENT
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.Supplier;

/**
* Responsible for handling parts of the original multipart request
Expand All @@ -56,8 +57,8 @@ public class AsyncPartsHandler {
* @param inputStreamContainers Checksum containers
* @param statsMetricPublisher sdk metric publisher
* @return list of completable futures
* @throws IOException thrown in case of an IO error
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public static List<CompletableFuture<CompletedPart>> uploadParts(
S3AsyncClient s3AsyncClient,
ExecutorService executorService,
Expand All @@ -69,35 +70,51 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
StatsMetricPublisher statsMetricPublisher,
boolean uploadRetryEnabled
) throws IOException {
boolean uploadRetryEnabled,
PermitBackedRetryableFutureUtils permitBackedRetryableFutureUtils
) throws InterruptedException {
List<CompletableFuture<CompletedPart>> futures = new ArrayList<>();
PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBackedRetryableFutureUtils.createRequestContext();
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
InputStreamContainer inputStreamContainer = streamContext.provideStream(partIdx);
inputStreamContainers.set(partIdx, new CheckedContainer(inputStreamContainer.getContentLength()));
UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder()
.bucket(uploadRequest.getBucket())
.partNumber(partIdx + 1)
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.contentLength(inputStreamContainer.getContentLength());
if (uploadRequest.doRemoteDataIntegrityCheck()) {
uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
}
uploadPart(
s3AsyncClient,
executorService,
priorityExecutorService,
urgentExecutorService,
completedParts,
inputStreamContainers,
futures,
uploadPartRequestBuilder.build(),
inputStreamContainer,
uploadRequest,
uploadRetryEnabled
int finalPartIdx = partIdx;
Supplier<CompletableFuture<CompletedPart>> partFutureSupplier = () -> {
InputStreamContainer inputStreamContainer;
try {
inputStreamContainer = streamContext.provideStream(finalPartIdx);
} catch (IOException e) {
return CompletableFuture.failedFuture(e);
}
inputStreamContainers.set(finalPartIdx, new CheckedContainer(inputStreamContainer.getContentLength()));
UploadPartRequest.Builder uploadPartRequestBuilder = UploadPartRequest.builder()
.bucket(uploadRequest.getBucket())
.partNumber(finalPartIdx + 1)
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.contentLength(inputStreamContainer.getContentLength());
if (uploadRequest.doRemoteDataIntegrityCheck()) {
uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
}
return uploadPart(
s3AsyncClient,
executorService,
priorityExecutorService,
urgentExecutorService,
completedParts,
inputStreamContainers,
uploadPartRequestBuilder.build(),
inputStreamContainer,
uploadRequest,
uploadRetryEnabled
);
};

CompletableFuture<CompletedPart> retryableFuture = permitBackedRetryableFutureUtils.createPermitBackedRetryableFuture(
partFutureSupplier,
uploadRequest.getWritePriority(),
requestContext
);
futures.add(retryableFuture);
}

return futures;
Expand Down Expand Up @@ -145,14 +162,13 @@ public static InputStream maybeRetryInputStream(
return inputStream;
}

private static void uploadPart(
private static CompletableFuture<CompletedPart> uploadPart(
S3AsyncClient s3AsyncClient,
ExecutorService executorService,
ExecutorService priorityExecutorService,
ExecutorService urgentExecutorService,
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
List<CompletableFuture<CompletedPart>> futures,
UploadPartRequest uploadPartRequest,
InputStreamContainer inputStreamContainer,
UploadRequest uploadRequest,
Expand Down Expand Up @@ -205,9 +221,9 @@ private static void uploadPart(
uploadRequest.doRemoteDataIntegrityCheck()
)
);
futures.add(convertFuture);

CompletableFutureUtils.forwardExceptionTo(convertFuture, uploadPartResponseFuture);
return convertFuture;
}

private static CompletedPart convertUploadPartResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public final class AsyncTransferManager {
private final ExecutorService urgentExecutorService;
private final long minimumPartSize;

@SuppressWarnings("rawtypes")
private final PermitBackedRetryableFutureUtils permitBackedRetryableFutureUtils;

/**
* The max number of parts on S3 side is 10,000
*/
Expand All @@ -74,19 +77,20 @@ public final class AsyncTransferManager {
* Construct a new object of AsyncTransferManager
*
* @param minimumPartSize The minimum part size for parallel multipart uploads
* @param executorService The stream reader {@link ExecutorService} for normal priority uploads
* @param priorityExecutorService The stream read {@link ExecutorService} for high priority uploads
*/
@SuppressWarnings("rawtypes")
public AsyncTransferManager(
long minimumPartSize,
ExecutorService executorService,
ExecutorService priorityExecutorService,
ExecutorService urgentExecutorService
ExecutorService urgentExecutorService,
PermitBackedRetryableFutureUtils permitBackedRetryableFutureUtils
) {
this.executorService = executorService;
this.priorityExecutorService = priorityExecutorService;
this.minimumPartSize = minimumPartSize;
this.urgentExecutorService = urgentExecutorService;
this.permitBackedRetryableFutureUtils = permitBackedRetryableFutureUtils;
}

/**
Expand All @@ -108,7 +112,7 @@ public CompletableFuture<Void> uploadObject(
try {
if (streamContext.getNumberOfParts() == 1) {
log.debug(() -> "Starting the upload as a single upload part request");
uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext.provideStream(0), returnFuture, statsMetricPublisher);
uploadInOneChunk(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher);
} else {
log.debug(() -> "Starting the upload as multipart upload request");
uploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, statsMetricPublisher);
Expand Down Expand Up @@ -146,21 +150,19 @@ private void uploadInParts(
// Ensure cancellations are forwarded to the createMultipartUploadFuture future
CompletableFutureUtils.forwardExceptionTo(returnFuture, createMultipartUploadFuture);

createMultipartUploadFuture.whenComplete((createMultipartUploadResponse, throwable) -> {
if (throwable != null) {
handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable);
} else {
log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId());
doUploadInParts(
s3AsyncClient,
uploadRequest,
streamContext,
returnFuture,
createMultipartUploadResponse.uploadId(),
statsMetricPublisher
);
}
});
String uploadId;
try {
// Block main thread here so that upload of parts doesn't get executed in future completion thread.
// We should never execute latent operation like acquisition of permit in future completion pool.
CreateMultipartUploadResponse createMultipartUploadResponse = createMultipartUploadFuture.get();
uploadId = createMultipartUploadResponse.uploadId();
log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId());
} catch (Exception ex) {
handleException(returnFuture, () -> "Failed to initiate multipart upload", ex);
return;
}

doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, uploadId, statsMetricPublisher);
}

private void doUploadInParts(
Expand Down Expand Up @@ -189,7 +191,8 @@ private void doUploadInParts(
completedParts,
inputStreamContainers,
statsMetricPublisher,
uploadRequest.isUploadRetryEnabled()
uploadRequest.isUploadRetryEnabled(),
permitBackedRetryableFutureUtils
);
} catch (Exception ex) {
try {
Expand Down Expand Up @@ -320,13 +323,14 @@ public long calculateOptimalPartSize(long contentLengthOfSource, WritePriority w
return (long) Math.max(optimalPartSize, minimumPartSize);
}

@SuppressWarnings("unchecked")
private void uploadInOneChunk(
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
InputStreamContainer inputStreamContainer,
StreamContext streamContext,
CompletableFuture<Void> returnFuture,
StatsMetricPublisher statsMetricPublisher
) {
) throws InterruptedException {
PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder()
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey())
Expand All @@ -349,14 +353,20 @@ private void uploadInOneChunk(
streamReadExecutor = executorService;
}

InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream(
inputStreamContainer.getInputStream(),
uploadRequest.getWritePriority(),
uploadRequest.isUploadRetryEnabled(),
uploadRequest.getContentLength()
);
CompletableFuture<Void> putObjectFuture = SocketAccess.doPrivileged(
() -> s3AsyncClient.putObject(
Supplier<CompletableFuture<Void>> putObjectFutureSupplier = () -> SocketAccess.doPrivileged(() -> {
InputStreamContainer inputStreamContainer;
try {
inputStreamContainer = streamContext.provideStream(0);
} catch (IOException e) {
return CompletableFuture.failedFuture(e);
}
InputStream inputStream = AsyncPartsHandler.maybeRetryInputStream(
inputStreamContainer.getInputStream(),
uploadRequest.getWritePriority(),
uploadRequest.isUploadRetryEnabled(),
uploadRequest.getContentLength()
);
return s3AsyncClient.putObject(
putObjectRequestBuilder.build(),
AsyncRequestBody.fromInputStream(inputStream, inputStreamContainer.getContentLength(), streamReadExecutor)
).handle((resp, throwable) -> {
Expand Down Expand Up @@ -395,7 +405,14 @@ private void uploadInOneChunk(
}

return null;
})
});
});

PermitBackedRetryableFutureUtils.RequestContext requestContext = permitBackedRetryableFutureUtils.createRequestContext();
CompletableFuture<Void> putObjectFuture = permitBackedRetryableFutureUtils.createPermitBackedRetryableFuture(
putObjectFutureSupplier,
uploadRequest.getWritePriority(),
requestContext
);

CompletableFutureUtils.forwardExceptionTo(returnFuture, putObjectFuture);
Expand Down
Loading

0 comments on commit 72cab64

Please sign in to comment.