Skip to content

Commit

Permalink
Adjust indices.recovery.max_bytes_per_sec according to external setti…
Browse files Browse the repository at this point in the history
…ngs (elastic#82819)

Today the setting indices.recovery.max_bytes_per_sec defaults to different 
values depending on the node roles, the JVM version and the system total 
memory that can be detected.

The current logic to set the default value can be summarized as:

    40 MB for non-data nodes
    40 MB for data nodes that runs on a JVM version < 14
    40 MB for data nodes that have one of the data_hot, data_warm, data_content or data roles

Nodes with only data_cold and/or data_frozen roles as data roles have a 
default value that depends of the available memory:

    with ≤ 4 GB of available memory, the default is 40 MB
    with more than 4 GB and less or equal to 8 GB, the default is 60 MB
    with more than 8 GB and less or equal to 16 GB, the default is 90 MB
    with more than 16 GB and less or equal to 32 GB, the default is 125 MB
    and above 32 GB, the default is 250 MB

While those defaults served us well, we want to evaluate if we can define 
more appropriate defaults if Elasticsearch were to know better the limits 
(or properties) of the hardware it is running on - something that Elasticsearch 
cannot extract by itself but can derive from settings that are provided at startup.

This pull request introduces the following new node settings:

    node.bandwidth.recovery.network
    node.bandwidth.recovery.disk.read
    node.bandwidth.recovery.disk.write

Those settings are not dynamic and must be set before the node starts. 
When they are set Elasticsearch detects the minimum available bandwidth 
among the network, disk read and disk write available bandwidths and computes 
a maximum bytes per seconds limit that will be a fraction of the min. available 
bandwidth. By default 40% of the min. bandwidth is used but that can be 
dynamically configured by an operator 
(using the node.bandwidth.recovery.operator.factor setting) or by the user 
directly (using a different setting node.bandwidth.recovery.factor).

The limit computed from available bandwidths is then compared to pre existing 
limitations like the one set through the indices.recovery.max_bytes_per_sec setting 
or the one that is computed by Elasticsearch from the node's physical memory 
on dedicated cold/frozen nodes. Elasticsearch will try to use the highest possible 
limit among those values, while not exceeding an overcommit ratio that is also 
defined through a node setting 
(see node.bandwidth.recovery.operator.factor.max_overcommit).

This overcommit ratio is here to prevent the rate limit to be set to a value that is 
greater than 100 times (by default) the minimum available bandwidth.
  • Loading branch information
tlrx authored Feb 1, 2022
1 parent fd0e252 commit 58fe36c
Show file tree
Hide file tree
Showing 5 changed files with 528 additions and 42 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/82819.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 82819
summary: "[Draft] Adjust `indices.recovery.max_bytes_per_sec` according to external\
\ settings"
area: Recovery
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,14 @@ public void apply(Settings value, Settings current, Settings previous) {
RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS,
RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_FACTOR_READ_SETTING,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_FACTOR_WRITE_SETTING,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_READ_SETTING,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_WRITE_SETTING,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING,
RecoverySettings.NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING,
ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING,
Expand Down
51 changes: 29 additions & 22 deletions server/src/main/java/org/elasticsearch/common/settings/Setting.java
Original file line number Diff line number Diff line change
Expand Up @@ -1979,28 +1979,35 @@ public static Setting<Double> doubleSetting(String key, double defaultValue, dou
}

public static Setting<Double> doubleSetting(String key, double defaultValue, double minValue, double maxValue, Property... properties) {
return new Setting<>(key, (s) -> Double.toString(defaultValue), (s) -> {
final double d = Double.parseDouble(s);
if (d < minValue) {
String err = "Failed to parse value"
+ (isFiltered(properties) ? "" : " [" + s + "]")
+ " for setting ["
+ key
+ "] must be >= "
+ minValue;
throw new IllegalArgumentException(err);
}
if (d > maxValue) {
String err = "Failed to parse value"
+ (isFiltered(properties) ? "" : " [" + s + "]")
+ " for setting ["
+ key
+ "] must be <= "
+ maxValue;
throw new IllegalArgumentException(err);
}
return d;
}, properties);
return new Setting<>(
key,
(s) -> Double.toString(defaultValue),
(s) -> parseDouble(s, minValue, maxValue, key, properties),
properties
);
}

public static Double parseDouble(String s, double minValue, double maxValue, String key, Property... properties) {
final double d = Double.parseDouble(s);
if (d < minValue) {
String err = "Failed to parse value"
+ (isFiltered(properties) ? "" : " [" + s + "]")
+ " for setting ["
+ key
+ "] must be >= "
+ minValue;
throw new IllegalArgumentException(err);
}
if (d > maxValue) {
String err = "Failed to parse value"
+ (isFiltered(properties) ? "" : " [" + s + "]")
+ " for setting ["
+ key
+ "] must be <= "
+ maxValue;
throw new IllegalArgumentException(err);
}
return d;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeRole;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
Expand All @@ -35,8 +37,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

import static org.elasticsearch.common.settings.Setting.parseInt;
import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;

public class RecoverySettings {
public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0;
Expand Down Expand Up @@ -66,7 +70,132 @@ public class RecoverySettings {
Property.NodeScope
);

public static final ByteSizeValue DEFAULT_MAX_BYTES_PER_SEC = new ByteSizeValue(40L, ByteSizeUnit.MB);
/**
* Disk's write bandwidth allocated for this node. This bandwidth is expressed for write operations that have the default block size of
* {@link #DEFAULT_CHUNK_SIZE}.
*/
public static final Setting<ByteSizeValue> NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING = bandwidthSetting(
"node.bandwidth.recovery.disk.write"
);

/**
* Disk's read bandwidth allocated for this node. This bandwidth is expressed for read operations that have the default block size of
* {@link #DEFAULT_CHUNK_SIZE}.
*/
public static final Setting<ByteSizeValue> NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING = bandwidthSetting(
"node.bandwidth.recovery.disk.read"
);

/**
* Network's read bandwidth allocated for this node.
*/
public static final Setting<ByteSizeValue> NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING = bandwidthSetting(
"node.bandwidth.recovery.network"
);

static final double DEFAULT_FACTOR_VALUE = 0.4d;

/**
* Default factor as defined by the operator.
*/
public static final Setting<Double> NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING = operatorFactorSetting(
"node.bandwidth.recovery.operator.factor",
DEFAULT_FACTOR_VALUE
);

public static final Setting<Double> NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_WRITE_SETTING = factorSetting(
"node.bandwidth.recovery.operator.factor.write",
NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING
);

public static final Setting<Double> NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_READ_SETTING = factorSetting(
"node.bandwidth.recovery.operator.factor.read",
NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING
);

public static final Setting<Double> NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_MAX_OVERCOMMIT_SETTING = Setting.doubleSetting(
"node.bandwidth.recovery.operator.factor.max_overcommit",
100d, // high default overcommit
1d,
Double.MAX_VALUE,
Property.NodeScope,
Property.OperatorDynamic
);

public static final Setting<Double> NODE_BANDWIDTH_RECOVERY_FACTOR_WRITE_SETTING = factorSetting(
"node.bandwidth.recovery.factor.write",
NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_WRITE_SETTING
);

public static final Setting<Double> NODE_BANDWIDTH_RECOVERY_FACTOR_READ_SETTING = factorSetting(
"node.bandwidth.recovery.factor.read",
NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_READ_SETTING
);

static final List<Setting<?>> NODE_BANDWIDTH_RECOVERY_SETTINGS = List.of(
NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING,
NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING,
NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING
);

/**
* Bandwidth settings have a default value of -1 (meaning that they are undefined) or a value in (0, Long.MAX_VALUE).
*/
private static Setting<ByteSizeValue> bandwidthSetting(String key) {
return new Setting<>(key, s -> ByteSizeValue.MINUS_ONE.getStringRep(), s -> {
final ByteSizeValue value = ByteSizeValue.parseBytesSizeValue(s, key);
if (ByteSizeValue.MINUS_ONE.equals(value)) {
return value;
}
if (value.getBytes() <= 0L) {
throw new IllegalArgumentException(
"Failed to parse value ["
+ s
+ "] for bandwidth setting ["
+ key
+ "], must be > ["
+ ByteSizeValue.ZERO.getStringRep()
+ ']'
);
}
if (value.getBytes() >= Long.MAX_VALUE) {
throw new IllegalArgumentException(
"Failed to parse value ["
+ s
+ "] for bandwidth setting ["
+ key
+ "], must be < ["
+ new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES).getStringRep()
+ ']'
);
}
return value;
}, Property.NodeScope);
}

/**
* Operator-defined factors have a value in (0.0, 1.0]
*/
private static Setting<Double> operatorFactorSetting(String key, double defaultValue) {
return new Setting<>(key, Double.toString(defaultValue), s -> Setting.parseDouble(s, 0d, 1d, key), v -> {
if (v == 0d) {
throw new IllegalArgumentException("Failed to validate value [" + v + "] for factor setting [" + key + "] must be > [0]");
}
}, Property.NodeScope, Property.OperatorDynamic);
}

/**
* User-defined factors have a value in (0.0, 1.0] and fall back to a corresponding operator factor setting.
*/
private static Setting<Double> factorSetting(String key, Setting<Double> operatorFallback) {
return new Setting<>(key, operatorFallback, s -> Setting.parseDouble(s, 0d, 1d, key), v -> {
if (v == 0d) {
throw new IllegalArgumentException("Failed to validate value [" + v + "] for factor setting [" + key + "] must be > [0]");
}
}, Property.NodeScope, Property.Dynamic);
}

static final ByteSizeValue DEFAULT_MAX_BYTES_PER_SEC = new ByteSizeValue(40L, ByteSizeUnit.MB);

public static final Setting<ByteSizeValue> INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting(
"indices.recovery.max_bytes_per_sec",
Expand Down Expand Up @@ -277,6 +406,10 @@ public Iterator<Setting<?>> settings() {

private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE;

private final ByteSizeValue availableNetworkBandwidth;
private final ByteSizeValue availableDiskReadBandwidth;
private final ByteSizeValue availableDiskWriteBandwidth;

public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings);
this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings);
Expand All @@ -288,22 +421,34 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
this.internalActionTimeout = INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.get(settings);
this.internalActionRetryTimeout = INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING.get(settings);
this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings);

this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings);
this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings);
if (maxBytesPerSec.getBytes() <= 0) {
rateLimiter = null;
} else {
rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac());
}
this.useSnapshotsDuringRecovery = INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.get(settings);
this.maxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings);
this.maxConcurrentSnapshotFileDownloadsPerNode = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.get(settings);
this.maxSnapshotFileDownloadsPerNodeSemaphore = new AdjustableSemaphore(this.maxConcurrentSnapshotFileDownloadsPerNode, true);

logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec);

clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec);
this.availableNetworkBandwidth = NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING.get(settings);
this.availableDiskReadBandwidth = NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING.get(settings);
this.availableDiskWriteBandwidth = NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING.get(settings);
validateNodeBandwidthRecoverySettings(settings);
computeMaxBytesPerSec(settings);
if (DiscoveryNode.canContainData(settings)) {
clusterSettings.addSettingsUpdateConsumer(
this::computeMaxBytesPerSec,
List.of(
INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING,
NODE_BANDWIDTH_RECOVERY_FACTOR_READ_SETTING,
NODE_BANDWIDTH_RECOVERY_FACTOR_WRITE_SETTING,
NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING,
NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_READ_SETTING,
NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_WRITE_SETTING,
// non dynamic settings but they are used to update max bytes per sec
NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING,
NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING,
NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING,
NODE_ROLES_SETTING
)
);
}
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations);
clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync);
Expand All @@ -325,6 +470,75 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) {
);
}

private void computeMaxBytesPerSec(Settings settings) {
// limit as computed before 8.1.0
final long defaultBytesPerSec = Math.max(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings).getBytes(), 0L);

// available network bandwidth
final long networkBandwidthBytesPerSec = Math.max(availableNetworkBandwidth.getBytes(), 0L);

// read bandwidth
final long readBytesPerSec;
if (availableDiskReadBandwidth.getBytes() > 0L && networkBandwidthBytesPerSec > 0L) {
double readFactor = NODE_BANDWIDTH_RECOVERY_FACTOR_READ_SETTING.get(settings);
readBytesPerSec = Math.round(Math.min(availableDiskReadBandwidth.getBytes(), networkBandwidthBytesPerSec) * readFactor);
} else {
readBytesPerSec = 0L;
}

// write bandwidth
final long writeBytesPerSec;
if (availableDiskWriteBandwidth.getBytes() > 0L && networkBandwidthBytesPerSec > 0L) {
double writeFactor = NODE_BANDWIDTH_RECOVERY_FACTOR_WRITE_SETTING.get(settings);
writeBytesPerSec = Math.round(Math.min(availableDiskWriteBandwidth.getBytes(), networkBandwidthBytesPerSec) * writeFactor);
} else {
writeBytesPerSec = 0L;
}

final long availableBytesPerSec = Math.min(readBytesPerSec, writeBytesPerSec);

long maxBytesPerSec;
if (availableBytesPerSec == 0L // no node recovery bandwidths
|| INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.exists(settings) // when set this setting overrides node recovery bandwidths
|| DiscoveryNode.canContainData(settings) == false) { // keep previous behavior for non data nodes
maxBytesPerSec = defaultBytesPerSec;
} else {
maxBytesPerSec = Math.max(defaultBytesPerSec, availableBytesPerSec);
}

final long maxAllowedBytesPerSec = Math.round(
Math.max(
Math.min(
Math.min(availableDiskReadBandwidth.getBytes(), availableDiskWriteBandwidth.getBytes()),
networkBandwidthBytesPerSec
),
0L
) * NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_MAX_OVERCOMMIT_SETTING.get(settings)
);

ByteSizeValue finalMaxBytesPerSec;
if (maxAllowedBytesPerSec > 0L) {
if (maxBytesPerSec > 0L) {
finalMaxBytesPerSec = ByteSizeValue.ofBytes(Math.min(maxBytesPerSec, maxAllowedBytesPerSec));
} else {
finalMaxBytesPerSec = ByteSizeValue.ofBytes(maxAllowedBytesPerSec);
}
} else {
finalMaxBytesPerSec = ByteSizeValue.ofBytes(maxBytesPerSec);
}
logger.info(
() -> new ParameterizedMessage(
"using rate limit [{}] with [default={}, read={}, write={}, max={}]",
finalMaxBytesPerSec,
ByteSizeValue.ofBytes(defaultBytesPerSec),
ByteSizeValue.ofBytes(readBytesPerSec),
ByteSizeValue.ofBytes(writeBytesPerSec),
ByteSizeValue.ofBytes(maxAllowedBytesPerSec)
)
);
setMaxBytesPerSec(finalMaxBytesPerSec);
}

public RateLimiter rateLimiter() {
return rateLimiter;
}
Expand Down Expand Up @@ -458,4 +672,20 @@ Releasable tryAcquireSnapshotDownloadPermits() {

return Releasables.releaseOnce(() -> maxSnapshotFileDownloadsPerNodeSemaphore.release(maxConcurrentSnapshotFileDownloads));
}

private static void validateNodeBandwidthRecoverySettings(Settings settings) {
final List<String> nonDefaults = NODE_BANDWIDTH_RECOVERY_SETTINGS.stream()
.filter(setting -> setting.get(settings) != ByteSizeValue.MINUS_ONE)
.map(Setting::getKey)
.collect(Collectors.toList());
if (nonDefaults.isEmpty() == false && nonDefaults.size() != NODE_BANDWIDTH_RECOVERY_SETTINGS.size()) {
throw new IllegalArgumentException(
"Settings "
+ NODE_BANDWIDTH_RECOVERY_SETTINGS.stream().map(Setting::getKey).collect(Collectors.toList())
+ " must all be defined or all be undefined; but only settings "
+ nonDefaults
+ " are configured."
);
}
}
}
Loading

0 comments on commit 58fe36c

Please sign in to comment.