Skip to content

Commit

Permalink
Producer-consumer queue to track in-queue transfer events
Browse files Browse the repository at this point in the history
Signed-off-by: vikasvb90 <[email protected]>
  • Loading branch information
vikasvb90 authored and Bhumika Saini committed May 2, 2024
1 parent 72cab64 commit bcaa6ac
Show file tree
Hide file tree
Showing 21 changed files with 1,132 additions and 1,002 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,21 @@ protected S3Repository createRepository(
ClusterService clusterService,
RecoverySettings recoverySettings
) {
return new S3Repository(metadata, registry, service, clusterService, recoverySettings, null, null, null, null, null, false) {
return new S3Repository(
metadata,
registry,
service,
clusterService,
recoverySettings,
null,
null,
null,
null,
null,
false,
null,
null
) {

@Override
public BlobStore blobStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;

Expand Down Expand Up @@ -218,7 +219,13 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
writeContext.getMetadata()
);
try {
if (uploadRequest.getContentLength() > ByteSizeUnit.GB.toBytes(10) && blobStore.isRedirectLargeUploads()) {
// If file size is greater than the queue capacity than SizeBasedBlockingQ will always reject the upload.
// Therefore, redirecting it to slow client.
if ((uploadRequest.getWritePriority() == WritePriority.LOW
&& blobStore.getLowPrioritySizeBasedBlockingQ().canProduce(uploadRequest.getContentLength()) == false)
|| (uploadRequest.getWritePriority() != WritePriority.HIGH
&& uploadRequest.getWritePriority() != WritePriority.URGENT
&& blobStore.getOtherPrioritySizeBasedBlockingQ().canProduce(uploadRequest.getContentLength()) == false)) {
StreamContext streamContext = SocketAccess.doPrivileged(
() -> writeContext.getStreamProvider(uploadRequest.getContentLength())
);
Expand Down Expand Up @@ -258,23 +265,51 @@ public void asyncBlobUpload(WriteContext writeContext, ActionListener<Void> comp
} else {
s3AsyncClient = amazonS3Reference.get().client();
}
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(response);
} else {
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
completionListener.onFailure(ex);
}
});

if (writeContext.getWritePriority() == WritePriority.URGENT || writeContext.getWritePriority() == WritePriority.HIGH) {
createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener);
} else if (writeContext.getWritePriority() == WritePriority.LOW) {
blobStore.getLowPrioritySizeBasedBlockingQ()
.produce(
new SizeBasedBlockingQ.Item(
writeContext.getFileSize(),
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
)
);
} else {
blobStore.getOtherPrioritySizeBasedBlockingQ()
.produce(
new SizeBasedBlockingQ.Item(
writeContext.getFileSize(),
() -> createFileCompletableFuture(s3AsyncClient, uploadRequest, streamContext, completionListener)
)
);
}
}
} catch (Exception e) {
logger.info("exception error from blob container for file {}", writeContext.getFileName());
throw new IOException(e);
}
}

private CompletableFuture<Void> createFileCompletableFuture(
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
StreamContext streamContext,
ActionListener<Void> completionListener
) {
CompletableFuture<Void> completableFuture = blobStore.getAsyncTransferManager()
.uploadObject(s3AsyncClient, uploadRequest, streamContext, blobStore.getStatsMetricPublisher());
return completableFuture.whenComplete((response, throwable) -> {
if (throwable == null) {
completionListener.onResponse(response);
} else {
Exception ex = throwable instanceof Error ? new Exception(throwable) : (Exception) throwable;
completionListener.onFailure(ex);
}
});
}

@ExperimentalApi
@Override
public void readBlobAsync(String blobName, ActionListener<ReadContext> listener) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -94,6 +95,8 @@ class S3BlobStore implements BlobStore {
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;
private final boolean multipartUploadEnabled;
private final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;

S3BlobStore(
S3Service service,
Expand All @@ -109,7 +112,9 @@ class S3BlobStore implements BlobStore {
AsyncTransferManager asyncTransferManager,
AsyncExecutorContainer urgentExecutorBuilder,
AsyncExecutorContainer priorityExecutorBuilder,
AsyncExecutorContainer normalExecutorBuilder
AsyncExecutorContainer normalExecutorBuilder,
SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ,
SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
) {
this.service = service;
this.s3AsyncService = s3AsyncService;
Expand All @@ -128,6 +133,8 @@ class S3BlobStore implements BlobStore {
// Settings to initialize blobstore with.
this.redirectLargeUploads = REDIRECT_LARGE_S3_UPLOAD.get(repositoryMetadata.settings());
this.uploadRetryEnabled = UPLOAD_RETRY_ENABLED.get(repositoryMetadata.settings());
this.otherPrioritySizeBasedBlockingQ = otherPrioritySizeBasedBlockingQ;
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;
}

@Override
Expand Down Expand Up @@ -184,6 +191,14 @@ public int getBulkDeletesSize() {
return bulkDeletesSize;
}

public SizeBasedBlockingQ getOtherPrioritySizeBasedBlockingQ() {
return otherPrioritySizeBasedBlockingQ;
}

public SizeBasedBlockingQ getLowPrioritySizeBasedBlockingQ() {
return lowPrioritySizeBasedBlockingQ;
}

@Override
public BlobContainer blobContainer(BlobPath path) {
return new S3BlobContainer(path, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.common.settings.SecureSetting;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.settings.SecureString;
Expand All @@ -63,6 +64,7 @@
import org.opensearch.repositories.blobstore.MeteredBlobStoreRepository;
import org.opensearch.repositories.s3.async.AsyncExecutorContainer;
import org.opensearch.repositories.s3.async.AsyncTransferManager;
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
import org.opensearch.snapshots.SnapshotId;
import org.opensearch.snapshots.SnapshotInfo;
import org.opensearch.threadpool.Scheduler;
Expand Down Expand Up @@ -193,12 +195,6 @@ class S3Repository extends MeteredBlobStoreRepository {
true,
Setting.Property.NodeScope
);

/**
* Number of retries in case of a transfer failure.
*/
public static Setting<Integer> S3_MAX_TRANSFER_RETRIES = Setting.intSetting("s3_max_transfer_retries", 3, Setting.Property.NodeScope);

/**
* Percentage of total available permits to be available for priority transfers.
*/
Expand All @@ -210,6 +206,27 @@ class S3Repository extends MeteredBlobStoreRepository {
Setting.Property.NodeScope
);

/**
* Duration in minutes to wait for a permit in case no permit is available.
*/
public static Setting<Integer> S3_PERMIT_WAIT_DURATION_MIN = Setting.intSetting(
"s3_permit_wait_duration_min",
5,
1,
10,
Setting.Property.NodeScope
);

/**
* Number of transfer queue consumers
*/
public static Setting<Integer> S3_TRANSFER_QUEUE_CONSUMERS = new Setting<>(
"s3_transfer_queue_consumers",
(s) -> Integer.toString(Math.max(5, OpenSearchExecutors.allocatedProcessors(s) * 2)),
(s) -> Setting.parseInt(s, 5, "s3_transfer_queue_consumers"),
Setting.Property.NodeScope
);

/**
* Big files can be broken down into chunks during snapshotting if needed. Defaults to 1g.
*/
Expand Down Expand Up @@ -268,6 +285,8 @@ class S3Repository extends MeteredBlobStoreRepository {
private final AsyncExecutorContainer priorityExecutorBuilder;
private final AsyncExecutorContainer normalExecutorBuilder;
private final Path pluginConfigPath;
private final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ;
private final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ;

private volatile int bulkDeletesSize;

Expand All @@ -283,7 +302,9 @@ class S3Repository extends MeteredBlobStoreRepository {
final AsyncExecutorContainer priorityExecutorBuilder,
final AsyncExecutorContainer normalExecutorBuilder,
final S3AsyncService s3AsyncService,
final boolean multipartUploadEnabled
final boolean multipartUploadEnabled,
final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ,
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
) {
this(
metadata,
Expand All @@ -297,7 +318,9 @@ class S3Repository extends MeteredBlobStoreRepository {
normalExecutorBuilder,
s3AsyncService,
multipartUploadEnabled,
Path.of("")
Path.of(""),
otherPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
);
}

Expand All @@ -316,7 +339,9 @@ class S3Repository extends MeteredBlobStoreRepository {
final AsyncExecutorContainer normalExecutorBuilder,
final S3AsyncService s3AsyncService,
final boolean multipartUploadEnabled,
Path pluginConfigPath
Path pluginConfigPath,
final SizeBasedBlockingQ otherPrioritySizeBasedBlockingQ,
final SizeBasedBlockingQ lowPrioritySizeBasedBlockingQ
) {
super(metadata, namedXContentRegistry, clusterService, recoverySettings, buildLocation(metadata));
this.service = service;
Expand All @@ -327,6 +352,8 @@ class S3Repository extends MeteredBlobStoreRepository {
this.urgentExecutorBuilder = urgentExecutorBuilder;
this.priorityExecutorBuilder = priorityExecutorBuilder;
this.normalExecutorBuilder = normalExecutorBuilder;
this.otherPrioritySizeBasedBlockingQ = otherPrioritySizeBasedBlockingQ;
this.lowPrioritySizeBasedBlockingQ = lowPrioritySizeBasedBlockingQ;

validateRepositoryMetadata(metadata);
readRepositoryMetadata();
Expand Down Expand Up @@ -389,7 +416,9 @@ protected S3BlobStore createBlobStore() {
asyncUploadUtils,
urgentExecutorBuilder,
priorityExecutorBuilder,
normalExecutorBuilder
normalExecutorBuilder,
otherPrioritySizeBasedBlockingQ,
lowPrioritySizeBasedBlockingQ
);
}

Expand Down
Loading

0 comments on commit bcaa6ac

Please sign in to comment.