Skip to content

Commit

Permalink
[Backport 2.x] [segment replication] decouple the rateLimiter of segr…
Browse files Browse the repository at this point in the history
…ep and recovery (opensearch-project#13181)

* [segment replication] decouple the rateLimiter of segrep and recovery (opensearch-project#12959)

* [segment replication] decouple the rateLimiter of segrep and recovery (12939)

add setting "segrep.max_bytes_per_sec"

Signed-off-by: maxliu <[email protected]>

* [segment replication] decouple the rateLimiter of segrep and recovery (12939)

use setting "indices.replication.max_bytes_per_sec" if enable "indices.replication.use_individual_rate_limiter"

Signed-off-by: maxliu <[email protected]>

* [segment replication] decouple the rateLimiter of segrep and recovery (12939)

setting "indices.replication.max_bytes_per_sec" takes effect when not negative

Signed-off-by: maxliu <[email protected]>

* [segment replication] decouple the rateLimiter of segrep and recovery (opensearch-project#12939)

add setting "indices.replication.max_bytes_per_sec" which takes effect when not negative

Signed-off-by: maxliu <[email protected]>

Adds change log

Signed-off-by: maxliu <[email protected]>

---------

Signed-off-by: maxliu <[email protected]>
(cherry picked from commit 6bc04b4)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>

* Add back public API in RecoverySettings to 2.x

Signed-off-by: Marc Handalian <[email protected]>

---------

Signed-off-by: maxliu <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Signed-off-by: Marc Handalian <[email protected]>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Marc Handalian <[email protected]>
  • Loading branch information
3 people authored Apr 15, 2024
1 parent b3c3255 commit c295d97
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,13 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
recoveryTarget.handleFileChunk(
request,
recoveryTarget,
bytesSinceLastPause,
recoverySettings.recoveryRateLimiter(),
listener
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ public class RecoverySettings {
Property.NodeScope
);

/**
* Individual speed setting for segment replication, default -1B to reuse the setting of recovery.
*/
public static final Setting<ByteSizeValue> INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
"indices.replication.max_bytes_per_sec",
new ByteSizeValue(-1),
Property.Dynamic,
Property.NodeScope
);

/**
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
*/
Expand Down Expand Up @@ -169,11 +179,13 @@ public class RecoverySettings {
// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

private volatile ByteSizeValue maxBytesPerSec;
private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile ByteSizeValue replicationMaxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile int maxConcurrentOperations;
private volatile int maxConcurrentRemoteStoreStreams;
private volatile SimpleRateLimiter rateLimiter;
private volatile SimpleRateLimiter recoveryRateLimiter;
private volatile SimpleRateLimiter replicationRateLimiter;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
private volatile TimeValue activityTimeout;
Expand All @@ -198,17 +210,20 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings);

this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (maxBytesPerSec.getBytes() <= 0) {
rateLimiter = null;
this.recoveryMaxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
recoveryRateLimiter = null;
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings);
updateReplicationRateLimiter();

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations);
clusterSettings.addSettingsUpdateConsumer(
Expand All @@ -227,8 +242,22 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {

}

/**
* Method unused as of 2.14 but left as part of public API.
* Use recoveryRateLimiter or replicationRateLimiter instead.
* @return {@link RateLimiter} Recovery rate limiter
*/
@Deprecated(forRemoval = true, since = "2.14")
public RateLimiter rateLimiter() {
return rateLimiter;
return recoveryRateLimiter;
}

public RateLimiter recoveryRateLimiter() {
return recoveryRateLimiter;
}

public RateLimiter replicationRateLimiter() {
return replicationRateLimiter;
}

public TimeValue retryDelayNetwork() {
Expand Down Expand Up @@ -294,14 +323,40 @@ public void setInternalRemoteUploadTimeout(TimeValue internalRemoteUploadTimeout
this.internalRemoteUploadTimeout = internalRemoteUploadTimeout;
}

private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
this.maxBytesPerSec = maxBytesPerSec;
if (maxBytesPerSec.getBytes() <= 0) {
rateLimiter = null;
} else if (rateLimiter != null) {
rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac());
private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) {
this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec;
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
recoveryRateLimiter = null;
} else if (recoveryRateLimiter != null) {
recoveryRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
if (replicationMaxBytesPerSec.getBytes() < 0) updateReplicationRateLimiter();
}

private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) {
this.replicationMaxBytesPerSec = replicationMaxBytesPerSec;
updateReplicationRateLimiter();
}

private void updateReplicationRateLimiter() {
if (replicationMaxBytesPerSec.getBytes() >= 0) {
if (replicationMaxBytesPerSec.getBytes() == 0) {
replicationRateLimiter = null;
} else if (replicationRateLimiter != null) {
replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac());
} else {
replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac());
}
} else { // when replicationMaxBytesPerSec = -1B, use setting of recovery
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
replicationRateLimiter = null;
} else if (replicationRateLimiter != null) {
replicationRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
} else {
replicationRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public RemoteRecoveryTargetHandler(
shardId,
PeerRecoveryTargetService.Actions.FILE_CHUNK,
requestSeqNoGenerator,
onSourceThrottle
onSourceThrottle,
recoverySettings::recoveryRateLimiter
);
this.remoteStoreEnabled = remoteStoreEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* This class handles sending file chunks over the transport layer to a target shard.
Expand All @@ -36,11 +37,11 @@ public final class RemoteSegmentFileChunkWriter implements FileChunkWriter {
private final AtomicLong requestSeqNoGenerator;
private final RetryableTransportClient retryableTransportClient;
private final ShardId shardId;
private final RecoverySettings recoverySettings;
private final long replicationId;
private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final TransportRequestOptions fileChunkRequestOptions;
private final Consumer<Long> onSourceThrottle;
private final Supplier<RateLimiter> rateLimiterSupplier;
private final String action;

public RemoteSegmentFileChunkWriter(
Expand All @@ -50,14 +51,15 @@ public RemoteSegmentFileChunkWriter(
ShardId shardId,
String action,
AtomicLong requestSeqNoGenerator,
Consumer<Long> onSourceThrottle
Consumer<Long> onSourceThrottle,
Supplier<RateLimiter> rateLimiterSupplier
) {
this.replicationId = replicationId;
this.recoverySettings = recoverySettings;
this.retryableTransportClient = retryableTransportClient;
this.shardId = shardId;
this.requestSeqNoGenerator = requestSeqNoGenerator;
this.onSourceThrottle = onSourceThrottle;
this.rateLimiterSupplier = rateLimiterSupplier;
this.fileChunkRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
Expand All @@ -78,7 +80,7 @@ public void writeFileChunk(
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
final RateLimiter rl = recoverySettings.rateLimiter();
final RateLimiter rl = rateLimiterSupplier.get();
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ public void messageReceived(CheckpointInfoRequest request, TransportChannel chan
request.getCheckpoint().getShardId(),
SegmentReplicationTargetService.Actions.FILE_CHUNK,
new AtomicLong(0),
(throttleTime) -> {}
(throttleTime) -> {},
recoverySettings::replicationRateLimiter
);
final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter);
channel.sendResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
final SegmentReplicationTarget target = ref.get();
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3291,7 +3291,7 @@ private static OffsetRangeInputStream maybeRateLimitRemoteTransfers(
public InputStream maybeRateLimitRestores(InputStream stream) {
return maybeRateLimit(
maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE),
recoverySettings::rateLimiter,
recoverySettings::recoveryRateLimiter,
restoreRateLimitingTimeInNanos,
BlobStoreTransferContext.SNAPSHOT_RESTORE
);
Expand All @@ -3314,7 +3314,7 @@ public InputStream maybeRateLimitRemoteDownloadTransfers(InputStream inputStream
remoteDownloadRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_DOWNLOAD
),
recoverySettings::rateLimiter,
recoverySettings::recoveryRateLimiter,
remoteDownloadRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_DOWNLOAD
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.TimeUnit;
Expand All @@ -47,7 +49,27 @@ public void testZeroBytesPerSecondIsNoRateLimit() {
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertEquals(null, recoverySettings.rateLimiter());
assertNull(recoverySettings.recoveryRateLimiter());
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertNull(recoverySettings.replicationRateLimiter());
}

public void testSetReplicationMaxBytesPerSec() {
assertEquals(40, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB))
.build()
);
assertEquals(60, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB))
.build()
);
assertEquals(80, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
}

public void testRetryDelayStateSync() {
Expand Down

0 comments on commit c295d97

Please sign in to comment.