Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[segment replication] decouple the rateLimiter of segrep and recovery #12959

Merged
merged 9 commits into from
Apr 14, 2024
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Reject Resize index requests (i.e, split, shrink and clone), While DocRep to SegRep migration is in progress.([#12686](https://github.com/opensearch-project/OpenSearch/pull/12686))
- Add support for more than one protocol for transport ([#12967](https://github.com/opensearch-project/OpenSearch/pull/12967))
- Add changes for overriding remote store and replication settings during snapshot restore. ([#11868](https://github.com/opensearch-project/OpenSearch/pull/11868))
- Add an individual setting of rate limiter for segment replication ([#12959](https://github.com/opensearch-project/OpenSearch/pull/12959))

### Dependencies
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ public void apply(Settings value, Settings current, Settings previous) {
ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_CLUSTER,
ShardLimitValidator.SETTING_CLUSTER_IGNORE_DOT_INDEXES,
RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING,
RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING,
RecoverySettings.INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,13 @@ public void messageReceived(final FileChunkRequest request, TransportChannel cha
try (ReplicationRef<RecoveryTarget> recoveryRef = onGoingRecoveries.getSafe(request.recoveryId(), request.shardId())) {
final RecoveryTarget recoveryTarget = recoveryRef.get();
final ActionListener<Void> listener = recoveryTarget.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
recoveryTarget.handleFileChunk(request, recoveryTarget, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
recoveryTarget.handleFileChunk(
request,
recoveryTarget,
bytesSinceLastPause,
recoverySettings.recoveryRateLimiter(),
listener
);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@
Property.NodeScope
);

/**
* Individual speed setting for segment replication, default -1B to reuse the setting of recovery.
*/
public static final Setting<ByteSizeValue> INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
"indices.replication.max_bytes_per_sec",
new ByteSizeValue(-1),
Property.Dynamic,
Property.NodeScope
);

/**
* Controls the maximum number of file chunk requests that can be sent concurrently from the source node to the target node.
*/
Expand Down Expand Up @@ -169,11 +179,13 @@
// choose 512KB-16B to ensure that the resulting byte[] is not a humongous allocation in G1.
public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512 * 1024 - 16, ByteSizeUnit.BYTES);

private volatile ByteSizeValue maxBytesPerSec;
private volatile ByteSizeValue recoveryMaxBytesPerSec;
private volatile ByteSizeValue replicationMaxBytesPerSec;
private volatile int maxConcurrentFileChunks;
private volatile int maxConcurrentOperations;
private volatile int maxConcurrentRemoteStoreStreams;
private volatile SimpleRateLimiter rateLimiter;
private volatile SimpleRateLimiter recoveryRateLimiter;
private volatile SimpleRateLimiter replicationRateLimiter;
private volatile TimeValue retryDelayStateSync;
private volatile TimeValue retryDelayNetwork;
private volatile TimeValue activityTimeout;
Expand All @@ -198,17 +210,20 @@
this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings);

this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (maxBytesPerSec.getBytes() <= 0) {
rateLimiter = null;
this.recoveryMaxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
recoveryRateLimiter = null;

Check warning on line 215 in server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java#L215

Added line #L215 was not covered by tests
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
this.replicationMaxBytesPerSec = INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.get(settings);
updateReplicationRateLimiter();

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);
logger.debug("using recovery max_bytes_per_sec[{}]", recoveryMaxBytesPerSec);
this.internalRemoteUploadTimeout = INDICES_INTERNAL_REMOTE_UPLOAD_TIMEOUT.get(settings);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setRecoveryMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING, this::setReplicationMaxBytesPerSec);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations);
clusterSettings.addSettingsUpdateConsumer(
Expand All @@ -227,8 +242,12 @@

}

public RateLimiter rateLimiter() {
return rateLimiter;
public RateLimiter recoveryRateLimiter() {
return recoveryRateLimiter;
}

public RateLimiter replicationRateLimiter() {
return replicationRateLimiter;
}

public TimeValue retryDelayNetwork() {
Expand Down Expand Up @@ -294,14 +313,40 @@
this.internalRemoteUploadTimeout = internalRemoteUploadTimeout;
}

private void setMaxBytesPerSec(ByteSizeValue maxBytesPerSec) {
this.maxBytesPerSec = maxBytesPerSec;
if (maxBytesPerSec.getBytes() <= 0) {
rateLimiter = null;
} else if (rateLimiter != null) {
rateLimiter.setMBPerSec(maxBytesPerSec.getMbFrac());
private void setRecoveryMaxBytesPerSec(ByteSizeValue recoveryMaxBytesPerSec) {
this.recoveryMaxBytesPerSec = recoveryMaxBytesPerSec;
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
recoveryRateLimiter = null;
} else if (recoveryRateLimiter != null) {
recoveryRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
recoveryRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
if (replicationMaxBytesPerSec.getBytes() < 0) updateReplicationRateLimiter();
}

private void setReplicationMaxBytesPerSec(ByteSizeValue replicationMaxBytesPerSec) {
this.replicationMaxBytesPerSec = replicationMaxBytesPerSec;
updateReplicationRateLimiter();
}

private void updateReplicationRateLimiter() {
if (replicationMaxBytesPerSec.getBytes() >= 0) {
if (replicationMaxBytesPerSec.getBytes() == 0) {
replicationRateLimiter = null;
} else if (replicationRateLimiter != null) {
replicationRateLimiter.setMBPerSec(replicationMaxBytesPerSec.getMbFrac());
} else {
replicationRateLimiter = new SimpleRateLimiter(replicationMaxBytesPerSec.getMbFrac());

Check warning on line 340 in server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/recovery/RecoverySettings.java#L340

Added line #L340 was not covered by tests
}
} else { // when replicationMaxBytesPerSec = -1B, use setting of recovery
if (recoveryMaxBytesPerSec.getBytes() <= 0) {
replicationRateLimiter = null;
} else if (replicationRateLimiter != null) {
replicationRateLimiter.setMBPerSec(recoveryMaxBytesPerSec.getMbFrac());
} else {
replicationRateLimiter = new SimpleRateLimiter(recoveryMaxBytesPerSec.getMbFrac());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public RemoteRecoveryTargetHandler(
shardId,
PeerRecoveryTargetService.Actions.FILE_CHUNK,
requestSeqNoGenerator,
onSourceThrottle
onSourceThrottle,
recoverySettings::recoveryRateLimiter
);
this.remoteStoreEnabled = remoteStoreEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* This class handles sending file chunks over the transport layer to a target shard.
Expand All @@ -36,11 +37,11 @@ public final class RemoteSegmentFileChunkWriter implements FileChunkWriter {
private final AtomicLong requestSeqNoGenerator;
private final RetryableTransportClient retryableTransportClient;
private final ShardId shardId;
private final RecoverySettings recoverySettings;
private final long replicationId;
private final AtomicLong bytesSinceLastPause = new AtomicLong();
private final TransportRequestOptions fileChunkRequestOptions;
private final Consumer<Long> onSourceThrottle;
private final Supplier<RateLimiter> rateLimiterSupplier;
private final String action;

public RemoteSegmentFileChunkWriter(
Expand All @@ -50,14 +51,15 @@ public RemoteSegmentFileChunkWriter(
ShardId shardId,
String action,
AtomicLong requestSeqNoGenerator,
Consumer<Long> onSourceThrottle
Consumer<Long> onSourceThrottle,
Supplier<RateLimiter> rateLimiterSupplier
) {
this.replicationId = replicationId;
this.recoverySettings = recoverySettings;
this.retryableTransportClient = retryableTransportClient;
this.shardId = shardId;
this.requestSeqNoGenerator = requestSeqNoGenerator;
this.onSourceThrottle = onSourceThrottle;
this.rateLimiterSupplier = rateLimiterSupplier;
this.fileChunkRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout())
Expand All @@ -78,7 +80,7 @@ public void writeFileChunk(
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
// always fetch the ratelimiter - it might be updated in real-time on the recovery settings
final RateLimiter rl = recoverySettings.rateLimiter();
final RateLimiter rl = rateLimiterSupplier.get();
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@
request.getCheckpoint().getShardId(),
SegmentReplicationTargetService.Actions.FILE_CHUNK,
new AtomicLong(0),
(throttleTime) -> {}
(throttleTime) -> {},

Check warning on line 127 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationSourceService.java#L127

Added line #L127 was not covered by tests
recoverySettings::replicationRateLimiter
);
final CopyState copyState = ongoingSegmentReplications.prepareForReplication(request, segmentSegmentFileChunkWriter);
channel.sendResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@
try (ReplicationRef<SegmentReplicationTarget> ref = onGoingReplications.getSafe(request.recoveryId(), request.shardId())) {
final SegmentReplicationTarget target = ref.get();
final ActionListener<Void> listener = target.createOrFinishListener(channel, Actions.FILE_CHUNK, request);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.rateLimiter(), listener);
target.handleFileChunk(request, target, bytesSinceLastPause, recoverySettings.replicationRateLimiter(), listener);

Check warning on line 638 in server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java#L638

Added line #L638 was not covered by tests
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3162,7 +3162,7 @@
public InputStream maybeRateLimitRestores(InputStream stream) {
return maybeRateLimit(
maybeRateLimit(stream, () -> restoreRateLimiter, restoreRateLimitingTimeInNanos, BlobStoreTransferContext.SNAPSHOT_RESTORE),
recoverySettings::rateLimiter,
recoverySettings::recoveryRateLimiter,
restoreRateLimitingTimeInNanos,
BlobStoreTransferContext.SNAPSHOT_RESTORE
);
Expand All @@ -3185,7 +3185,7 @@
remoteDownloadRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_DOWNLOAD
),
recoverySettings::rateLimiter,
recoverySettings::recoveryRateLimiter,

Check warning on line 3188 in server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java#L3188

Added line #L3188 was not covered by tests
remoteDownloadRateLimitingTimeInNanos,
BlobStoreTransferContext.REMOTE_DOWNLOAD
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.TimeUnit;
Expand All @@ -47,7 +49,27 @@ public void testZeroBytesPerSecondIsNoRateLimit() {
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertEquals(null, recoverySettings.rateLimiter());
assertNull(recoverySettings.recoveryRateLimiter());
clusterSettings.applySettings(
Settings.builder().put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), 0).build()
);
assertNull(recoverySettings.replicationRateLimiter());
}

public void testSetReplicationMaxBytesPerSec() {
assertEquals(40, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(60, ByteSizeUnit.MB))
.build()
);
assertEquals(60, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
clusterSettings.applySettings(
Settings.builder()
.put(RecoverySettings.INDICES_REPLICATION_MAX_BYTES_PER_SEC_SETTING.getKey(), new ByteSizeValue(80, ByteSizeUnit.MB))
.build()
);
assertEquals(80, (int) recoverySettings.replicationRateLimiter().getMBPerSec());
}

public void testRetryDelayStateSync() {
Expand Down
Loading