Skip to content

Commit

Permalink
[segment replication] decouple the rateLimiter of segrep and recovery (
Browse files Browse the repository at this point in the history
…#12939)

add setting "indices.replication.max_bytes_per_sec" which takes effect when not negative

Signed-off-by: maxliu <[email protected]>

Adds change log

Signed-off-by: maxliu <[email protected]>
  • Loading branch information
Ferrari248 committed Apr 11, 2024
1 parent cf5ecd1 commit 8c1a6ce
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Make translog transfer timeout configurable ([#12704](https://github.com/opensearch-project/OpenSearch/pull/12704))
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ public final class RemoteSegmentFileChunkWriter implements FileChunkWriter {
private final AtomicLong requestSeqNoGenerator;
private final RetryableTransportClient retryableTransportClient;
private final ShardId shardId;
private final RecoverySettings recoverySettings;
private final long replicationId;
private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final TransportRequestOptions fileChunkRequestOptions;
private final Consumer<Long> onSourceThrottle;
private final Supplier<RateLimiter> rateLimiterProvider;
private final Supplier<RateLimiter> rateLimiterSupplier;
private final String action;

public RemoteSegmentFileChunkWriter(
Expand All @@ -53,15 +52,14 @@ public RemoteSegmentFileChunkWriter(
String action,
AtomicLong requestSeqNoGenerator,
Consumer<Long> onSourceThrottle,
Supplier<RateLimiter> rateLimiterProvider
Supplier<RateLimiter> rateLimiterSupplier
) {
this.replicationId = replicationId;
this.recoverySettings = recoverySettings;
this.retryableTransportClient = retryableTransportClient;
this.shardId = shardId;
this.requestSeqNoGenerator = requestSeqNoGenerator;
this.onSourceThrottle = onSourceThrottle;
this.rateLimiterProvider = rateLimiterProvider;
this.rateLimiterSupplier = rateLimiterSupplier;
this.fileChunkRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
Expand All @@ -82,7 +80,7 @@ public void writeFileChunk(
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
final RateLimiter rl = rateLimiterProvider.get();
final RateLimiter rl = rateLimiterSupplier.get();
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
Expand Down

0 comments on commit 8c1a6ce

Please sign in to comment.