From c295d97f919cbb2519523f75d821dd7f870a901e Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Mon, 15 Apr 2024 13:29:53 -0700 Subject: [PATCH] [Backport 2.x] [segment replication] decouple the rateLimiter of segrep and recovery (#13181) * [segment replication] decouple the rateLimiter of segrep and recovery (#12959) * [segment replication] decouple the rateLimiter of segrep and recovery (12939) add setting "segrep.max_bytes_per_sec" Signed-off-by: maxliu * [segment replication] decouple the rateLimiter of segrep and recovery (12939) use setting "indices.replication.max_bytes_per_sec" if enable "indices.replication.use_individual_rate_limiter" Signed-off-by: maxliu * [segment replication] decouple the rateLimiter of segrep and recovery (12939) setting "indices.replication.max_bytes_per_sec" takes effect when not negative Signed-off-by: maxliu * [segment replication] decouple the rateLimiter of segrep and recovery (#12939) add setting "indices.replication.max_bytes_per_sec" which takes effect when not negative Signed-off-by: maxliu Adds change log Signed-off-by: maxliu --------- Signed-off-by: maxliu (cherry picked from commit 6bc04b49a0031e37466001469abcfd593e454813) Signed-off-by: github-actions[bot] * Add back public API in RecoverySettings to 2.x Signed-off-by: Marc Handalian --------- Signed-off-by: maxliu Signed-off-by: github-actions[bot] Signed-off-by: Marc Handalian Co-authored-by: github-actions[bot] Co-authored-by: Marc Handalian --- CHANGELOG.md | 1 + .../common/settings/ClusterSettings.java | 1 + .../recovery/PeerRecoveryTargetService.java | 8 +- .../indices/recovery/RecoverySettings.java | 87 +++++++++++++++---- .../recovery/RemoteRecoveryTargetHandler.java | 3 +- .../RemoteSegmentFileChunkWriter.java | 10 ++- .../SegmentReplicationSourceService.java | 3 +- .../SegmentReplicationTargetService.java | 2 +- .../blobstore/BlobStoreRepository.java | 4 +- .../RecoverySettingsDynamicUpdateTests.java | 24 ++++- 10 files changed, 116 insertions(+), 27 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fd135778cfaa..6e104b1066031 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967)) - Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868)) - Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686)) +- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959)) ### Dependencies - Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896)) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 92bcbf9e9f3dd..df7f67c6295a3 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) { ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER, ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES, RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, + RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING, RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING, diff --git a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java index d09c2963c94ff..139e7b51d2200 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/opensearch/indices/recovery/PeerRecoveryTargetService.java @@ -576,7 +576,13 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha try (ReplicationRef recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) { final RecoveryTarget recoveryTarget = recoveryRef.get(); final ActionListener listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request); - recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); + recoveryTarget.handleFileChunk( + request, + recoveryTarget, + bytesSinceLastPause, + recoverySettings.recoveryRateLimiter(), + listener + ); } } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java index 53b42347aa30d..92704260322fd 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java @@ -65,6 +65,16 @@ public class RecoverySettings { Property.NodeScope ); + /** + * Individual speed setting for segment replication, default -1B to reuse the setting of recovery. + */ + public static final Setting INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( + "indices.replication.max_bytes_per_sec", + new ByteSizeValue(-1), + Property.Dynamic, + Property.NodeScope + ); + /** * Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node. */ @@ -169,11 +179,13 @@ public class RecoverySettings { // choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1. public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES); - private volatile ByteSizeValue maxBytesPerSec; + private volatile ByteSizeValue recoveryMaxBytesPerSec; + private volatile ByteSizeValue replicationMaxBytesPerSec; private volatile int maxConcurrentFileChunks; private volatile int maxConcurrentOperations; private volatile int maxConcurrentRemoteStoreStreams; - private volatile SimpleRateLimiter rateLimiter; + private volatile SimpleRateLimiter recoveryRateLimiter; + private volatile SimpleRateLimiter replicationRateLimiter; private volatile TimeValue retryDelayStateSync; private volatile TimeValue retryDelayNetwork; private volatile TimeValue activityTimeout; @@ -198,17 +210,20 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings); this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); - this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings); - if (maxBytesPerSec.getBytes() <= 0) { - rateLimiter = null; + this.recoveryMaxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings); + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + recoveryRateLimiter = null; } else { - rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); + recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); } + this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings); + updateReplicationRateLimiter(); - logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); + logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec); this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings); - clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec); + clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations); clusterSettings.addSettingsUpdateConsumer( @@ -227,8 +242,22 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { } + /** + * Method unused as of 2.14 but left as part of public API. + * Use recoveryRateLimiter or replicationRateLimiter instead. + * @return {@link RateLimiter} Recovery rate limiter + */ + @Deprecated(forRemoval = true, since = "2.14") public RateLimiter rateLimiter() { - return rateLimiter; + return recoveryRateLimiter; + } + + public RateLimiter recoveryRateLimiter() { + return recoveryRateLimiter; + } + + public RateLimiter replicationRateLimiter() { + return replicationRateLimiter; } public TimeValue retryDelayNetwork() { @@ -294,14 +323,40 @@ public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout this.internalRemoteUploadTimeout = internalRemoteUploadTimeout; } - private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) { - this.maxBytesPerSec = maxBytesPerSec; - if (maxBytesPerSec.getBytes() <= 0) { - rateLimiter = null; - } else if (rateLimiter != null) { - rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac()); + private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) { + this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec; + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + recoveryRateLimiter = null; + } else if (recoveryRateLimiter != null) { + recoveryRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac()); } else { - rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); + recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); + } + if (replicationMaxBytesPerSec.getBytes() < 0) updateReplicationRateLimiter(); + } + + private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) { + this.replicationMaxBytesPerSec = replicationMaxBytesPerSec; + updateReplicationRateLimiter(); + } + + private void updateReplicationRateLimiter() { + if (replicationMaxBytesPerSec.getBytes() >= 0) { + if (replicationMaxBytesPerSec.getBytes() == 0) { + replicationRateLimiter = null; + } else if (replicationRateLimiter != null) { + replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac()); + } else { + replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac()); + } + } else { // when replicationMaxBytesPerSec = -1B, use setting of recovery + if (recoveryMaxBytesPerSec.getBytes() <= 0) { + replicationRateLimiter = null; + } else if (replicationRateLimiter != null) { + replicationRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac()); + } else { + replicationRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac()); + } } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java index 37227596fdfe7..4d4c20f778ef3 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RemoteRecoveryTargetHandler.java @@ -111,7 +111,8 @@ public RemoteRecoveryTargetHandler( shardId, PeerRecoveryTargetService.Actions.FILE_CHUNK, requestSeqNoGenerator, - onSourceThrottle + onSourceThrottle, + recoverySettings::recoveryRateLimiter ); this.remoteStoreEnabled = remoteStoreEnabled; } diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java index b52fe66816098..179e497565326 100644 --- a/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java +++ b/server/src/main/java/org/opensearch/indices/replication/RemoteSegmentFileChunkWriter.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.Supplier; /** * This class handles sending file chunks over the transport layer to a target shard. @@ -36,11 +37,11 @@ public final class RemoteSegmentFileChunkWriter implements FileChunkWriter { private final AtomicLong requestSeqNoGenerator; private final RetryableTransportClient retryableTransportClient; private final ShardId shardId; - private final RecoverySettings recoverySettings; private final long replicationId; private final AtomicLong bytesSinceLastPause = new AtomicLong(); private final TransportRequestOptions fileChunkRequestOptions; private final Consumer onSourceThrottle; + private final Supplier rateLimiterSupplier; private final String action; public RemoteSegmentFileChunkWriter( @@ -50,14 +51,15 @@ public RemoteSegmentFileChunkWriter( ShardId shardId, String action, AtomicLong requestSeqNoGenerator, - Consumer onSourceThrottle + Consumer onSourceThrottle, + Supplier rateLimiterSupplier ) { this.replicationId = replicationId; - this.recoverySettings = recoverySettings; this.retryableTransportClient = retryableTransportClient; this.shardId = shardId; this.requestSeqNoGenerator = requestSeqNoGenerator; this.onSourceThrottle = onSourceThrottle; + this.rateLimiterSupplier = rateLimiterSupplier; this.fileChunkRequestOptions = TransportRequestOptions.builder() .withType(TransportRequestOptions.Type.RECOVERY) .withTimeout(recoverySettings.internalActionTimeout()) @@ -78,7 +80,7 @@ public void writeFileChunk( // Pause using the rate limiter, if desired, to throttle the recovery final long throttleTimeInNanos; // always fetch the ratelimiter - it might be updated in real-time on the recovery settings - final RateLimiter rl = recoverySettings.rateLimiter(); + final RateLimiter rl = rateLimiterSupplier.get(); if (rl != null) { long bytes = bytesSinceLastPause.addAndGet(content.length()); if (bytes > rl.getMinPauseCheckBytes()) { diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java index a393faabae0ea..3a762200e4a82 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java @@ -124,7 +124,8 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan request.getCheckpoint().getShardId(), SegmentReplicationTargetService.Actions.FILE_CHUNK, new AtomicLong(0), - (throttleTime) -> {} + (throttleTime) -> {}, + recoverySettings::replicationRateLimiter ); final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter); channel.sendResponse( diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 4942d39cfa48a..fbd7ab7cea346 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -635,7 +635,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha try (ReplicationRef ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) { final SegmentReplicationTarget target = ref.get(); final ActionListener listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request); - target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener); + target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener); } } } diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index f6a7dcc68d311..e9241bb8d88a4 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -3291,7 +3291,7 @@ private static OffsetRangeInputStream maybeRateLimitRemoteTransfers( public InputStream maybeRateLimitRestores(InputStream stream) { return maybeRateLimit( maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE), - recoverySettings::rateLimiter, + recoverySettings::recoveryRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE ); @@ -3314,7 +3314,7 @@ public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream remoteDownloadRateLimitingTimeInNanos, BlobStoreTransferContext.REMOTE_DOWNLOAD ), - recoverySettings::rateLimiter, + recoverySettings::recoveryRateLimiter, remoteDownloadRateLimitingTimeInNanos, BlobStoreTransferContext.REMOTE_DOWNLOAD ); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java index 75639661f539d..2793d446d66c8 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoverySettingsDynamicUpdateTests.java @@ -35,6 +35,8 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.test.OpenSearchTestCase; import java.util.concurrent.TimeUnit; @@ -47,7 +49,27 @@ public void testZeroBytesPerSecondIsNoRateLimit() { clusterSettings.applySettings( Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() ); - assertEquals(null, recoverySettings.rateLimiter()); + assertNull(recoverySettings.recoveryRateLimiter()); + clusterSettings.applySettings( + Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build() + ); + assertNull(recoverySettings.replicationRateLimiter()); + } + + public void testSetReplicationMaxBytesPerSec() { + assertEquals(40, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); + clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB)) + .build() + ); + assertEquals(60, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); + clusterSettings.applySettings( + Settings.builder() + .put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB)) + .build() + ); + assertEquals(80, (int) recoverySettings.replicationRateLimiter().getMBPerSec()); } public void testRetryDelayStateSync() {