Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Stream read pool and default s3 timeouts tuning #11436

Merged
merged 1 commit into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ final class S3ClientSettings {
static final Setting.AffixSetting<TimeValue> REQUEST_TIMEOUT_SETTING = Setting.affixKeySetting(
PREFIX,
"request_timeout",
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope)
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(5), Property.NodeScope)
);

/** The connection timeout for connecting to s3. */
Expand All @@ -198,14 +198,14 @@ final class S3ClientSettings {
static final Setting.AffixSetting<Integer> MAX_CONNECTIONS_SETTING = Setting.affixKeySetting(
PREFIX,
"max_connections",
key -> Setting.intSetting(key, 100, Property.NodeScope)
key -> Setting.intSetting(key, 500, Property.NodeScope)
);

/** Connection acquisition timeout for new connections to S3. */
static final Setting.AffixSetting<TimeValue> CONNECTION_ACQUISITION_TIMEOUT = Setting.affixKeySetting(
PREFIX,
"connection_acquisition_timeout",
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(2), Property.NodeScope)
key -> Setting.timeSetting(key, TimeValue.timeValueMinutes(15), Property.NodeScope)
);

/** The maximum pending connections to S3. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,23 +99,32 @@
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
List<ExecutorBuilder<?>> executorBuilders = new ArrayList<>();
int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors(settings));
int halfProc = halfNumberOfProcessors(allocatedProcessors(settings));

Check warning on line 102 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L102

Added line #L102 was not covered by tests
executorBuilders.add(
new FixedExecutorBuilder(settings, URGENT_FUTURE_COMPLETION, urgentPoolCount(settings), 10_000, URGENT_FUTURE_COMPLETION)
);
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(new ScalingExecutorBuilder(URGENT_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5)));

Check warning on line 106 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L106

Added line #L106 was not covered by tests
executorBuilders.add(
new FixedExecutorBuilder(settings, PRIORITY_FUTURE_COMPLETION, priorityPoolCount(settings), 10_000, PRIORITY_FUTURE_COMPLETION)
new ScalingExecutorBuilder(PRIORITY_FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5))

Check warning on line 108 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L108

Added line #L108 was not covered by tests
);
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(new ScalingExecutorBuilder(PRIORITY_STREAM_READER, 1, halfProc, TimeValue.timeValueMinutes(5)));

Check warning on line 110 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L110

Added line #L110 was not covered by tests

executorBuilders.add(new FixedExecutorBuilder(settings, FUTURE_COMPLETION, normalPoolCount(settings), 10_000, FUTURE_COMPLETION));
executorBuilders.add(new ScalingExecutorBuilder(STREAM_READER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
executorBuilders.add(
new ScalingExecutorBuilder(FUTURE_COMPLETION, 1, allocatedProcessors(settings), TimeValue.timeValueMinutes(5))

Check warning on line 113 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L112-L113

Added lines #L112 - L113 were not covered by tests
);
executorBuilders.add(

Check warning on line 115 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L115

Added line #L115 was not covered by tests
new ScalingExecutorBuilder(
STREAM_READER,
allocatedProcessors(settings),
4 * allocatedProcessors(settings),
TimeValue.timeValueMinutes(5)

Check warning on line 120 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L118-L120

Added lines #L118 - L120 were not covered by tests
)
);
return executorBuilders;
}

static int halfAllocatedProcessorsMaxFive(final int allocatedProcessors) {
return boundedBy((allocatedProcessors + 1) / 2, 1, 5);
static int halfNumberOfProcessors(int numberOfProcessors) {
return (numberOfProcessors + 1) / 2;

Check warning on line 127 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3RepositoryPlugin.java#L127

Added line #L127 was not covered by tests
}

S3RepositoryPlugin(final Settings settings, final Path configPath, final S3Service service, final S3AsyncService s3AsyncService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import software.amazon.awssdk.metrics.MetricPublisher;
import software.amazon.awssdk.metrics.MetricRecord;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.blobstore.BlobStore;

import java.time.Duration;
Expand All @@ -21,6 +23,7 @@

public class StatsMetricPublisher {

private static final Logger LOGGER = LogManager.getLogger(StatsMetricPublisher.class);
private final Stats stats = new Stats();

private final Map<BlobStore.Metric, Stats> extendedStats = new HashMap<>() {
Expand All @@ -35,6 +38,7 @@
public MetricPublisher listObjectsMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "List objects request metrics: " + metricCollection);

Check warning on line 41 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java#L41

Added line #L41 was not covered by tests
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -64,6 +68,7 @@
public MetricPublisher deleteObjectsMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Delete objects request metrics: " + metricCollection);

Check warning on line 71 in plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java

View check run for this annotation

Codecov / codecov/patch

plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/StatsMetricPublisher.java#L71

Added line #L71 was not covered by tests
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -93,6 +98,7 @@
public MetricPublisher getObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Get object request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -122,6 +128,7 @@
public MetricPublisher putObjectMetricPublisher = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Put object request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down Expand Up @@ -151,6 +158,7 @@
public MetricPublisher multipartUploadMetricCollector = new MetricPublisher() {
@Override
public void publish(MetricCollection metricCollection) {
LOGGER.debug(() -> "Multi-part request metrics: " + metricCollection);
for (MetricRecord<?> metricRecord : metricCollection) {
switch (metricRecord.metric().name()) {
case "ApiCallDuration":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.common.io.InputStreamContainer;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.repositories.s3.SocketAccess;
import org.opensearch.repositories.s3.StatsMetricPublisher;
import org.opensearch.repositories.s3.io.CheckedContainer;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class AsyncPartsHandler {
* @param completedParts Reference of completed parts
* @param inputStreamContainers Checksum containers
* @return list of completable futures
* @param statsMetricPublisher sdk metric publisher
* @throws IOException thrown in case of an IO error
*/
public static List<CompletableFuture<CompletedPart>> uploadParts(
Expand All @@ -66,7 +68,8 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
StreamContext streamContext,
String uploadId,
AtomicReferenceArray<CompletedPart> completedParts,
AtomicReferenceArray<CheckedContainer> inputStreamContainers
AtomicReferenceArray<CheckedContainer> inputStreamContainers,
StatsMetricPublisher statsMetricPublisher
) throws IOException {
List<CompletableFuture<CompletedPart>> futures = new ArrayList<>();
for (int partIdx = 0; partIdx < streamContext.getNumberOfParts(); partIdx++) {
Expand All @@ -77,6 +80,7 @@ public static List<CompletableFuture<CompletedPart>> uploadParts(
.partNumber(partIdx + 1)
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.contentLength(inputStreamContainer.getContentLength());
if (uploadRequest.doRemoteDataIntegrityCheck()) {
uploadPartRequestBuilder.checksumAlgorithm(ChecksumAlgorithm.CRC32);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,14 @@ private void uploadInParts(
handleException(returnFuture, () -> "Failed to initiate multipart upload", throwable);
} else {
log.debug(() -> "Initiated new multipart upload, uploadId: " + createMultipartUploadResponse.uploadId());
doUploadInParts(s3AsyncClient, uploadRequest, streamContext, returnFuture, createMultipartUploadResponse.uploadId());
doUploadInParts(
s3AsyncClient,
uploadRequest,
streamContext,
returnFuture,
createMultipartUploadResponse.uploadId(),
statsMetricPublisher
);
}
});
}
Expand All @@ -156,7 +163,8 @@ private void doUploadInParts(
UploadRequest uploadRequest,
StreamContext streamContext,
CompletableFuture<Void> returnFuture,
String uploadId
String uploadId,
StatsMetricPublisher statsMetricPublisher
) {

// The list of completed parts must be sorted
Expand All @@ -174,7 +182,8 @@ private void doUploadInParts(
streamContext,
uploadId,
completedParts,
inputStreamContainers
inputStreamContainers,
statsMetricPublisher
);
} catch (Exception ex) {
try {
Expand All @@ -198,7 +207,7 @@ private void doUploadInParts(
}
return null;
})
.thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts))
.thenCompose(ignore -> completeMultipartUpload(s3AsyncClient, uploadRequest, uploadId, completedParts, statsMetricPublisher))
.handle(handleExceptionOrResponse(s3AsyncClient, uploadRequest, returnFuture, uploadId))
.exceptionally(throwable -> {
handleException(returnFuture, () -> "Unexpected exception occurred", throwable);
Expand Down Expand Up @@ -245,7 +254,8 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
S3AsyncClient s3AsyncClient,
UploadRequest uploadRequest,
String uploadId,
AtomicReferenceArray<CompletedPart> completedParts
AtomicReferenceArray<CompletedPart> completedParts,
StatsMetricPublisher statsMetricPublisher
) {

log.debug(() -> new ParameterizedMessage("Sending completeMultipartUploadRequest, uploadId: {}", uploadId));
Expand All @@ -254,6 +264,7 @@ private CompletableFuture<CompleteMultipartUploadResponse> completeMultipartUplo
.bucket(uploadRequest.getBucket())
.key(uploadRequest.getKey())
.uploadId(uploadId)
.overrideConfiguration(o -> o.addMetricPublisher(statsMetricPublisher.multipartUploadMetricCollector))
.multipartUpload(CompletedMultipartUpload.builder().parts(parts).build())
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ public void testThereIsADefaultClientByDefault() {
assertThat(defaultSettings.protocol, is(Protocol.HTTPS));
assertThat(defaultSettings.proxySettings, is(ProxySettings.NO_PROXY_SETTINGS));
assertThat(defaultSettings.readTimeoutMillis, is(50 * 1000));
assertThat(defaultSettings.requestTimeoutMillis, is(120 * 1000));
assertThat(defaultSettings.requestTimeoutMillis, is(5 * 60 * 1000));
assertThat(defaultSettings.connectionTimeoutMillis, is(10 * 1000));
assertThat(defaultSettings.connectionTTLMillis, is(5 * 1000));
assertThat(defaultSettings.maxConnections, is(100));
assertThat(defaultSettings.maxConnections, is(500));
assertThat(defaultSettings.maxRetries, is(3));
assertThat(defaultSettings.throttleRetries, is(true));
}
Expand Down
Loading