From 58fe36c1cffe8ca0ad55d7941e59924be34ccbd7 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Tue, 1 Feb 2022 08:59:47 +0100 Subject: [PATCH] Adjust indices.recovery.max_bytes_per_sec according to external settings (#82819) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Today the setting indices.recovery.max_bytes_per_sec defaults to different values depending on the node roles, the JVM version and the system total memory that can be detected. The current logic to set the default value can be summarized as: 40 MB for non-data nodes 40 MB for data nodes that runs on a JVM version < 14 40 MB for data nodes that have one of the data_hot, data_warm, data_content or data roles Nodes with only data_cold and/or data_frozen roles as data roles have a default value that depends of the available memory: with ≤ 4 GB of available memory, the default is 40 MB with more than 4 GB and less or equal to 8 GB, the default is 60 MB with more than 8 GB and less or equal to 16 GB, the default is 90 MB with more than 16 GB and less or equal to 32 GB, the default is 125 MB and above 32 GB, the default is 250 MB While those defaults served us well, we want to evaluate if we can define more appropriate defaults if Elasticsearch were to know better the limits (or properties) of the hardware it is running on - something that Elasticsearch cannot extract by itself but can derive from settings that are provided at startup. This pull request introduces the following new node settings: node.bandwidth.recovery.network node.bandwidth.recovery.disk.read node.bandwidth.recovery.disk.write Those settings are not dynamic and must be set before the node starts. When they are set Elasticsearch detects the minimum available bandwidth among the network, disk read and disk write available bandwidths and computes a maximum bytes per seconds limit that will be a fraction of the min. available bandwidth. By default 40% of the min. bandwidth is used but that can be dynamically configured by an operator (using the node.bandwidth.recovery.operator.factor setting) or by the user directly (using a different setting node.bandwidth.recovery.factor). The limit computed from available bandwidths is then compared to pre existing limitations like the one set through the indices.recovery.max_bytes_per_sec setting or the one that is computed by Elasticsearch from the node's physical memory on dedicated cold/frozen nodes. Elasticsearch will try to use the highest possible limit among those values, while not exceeding an overcommit ratio that is also defined through a node setting (see node.bandwidth.recovery.operator.factor.max_overcommit). This overcommit ratio is here to prevent the rate limit to be set to a value that is greater than 100 times (by default) the minimum available bandwidth. --- docs/changelog/82819.yaml | 6 + .../common/settings/ClusterSettings.java | 8 + .../common/settings/Setting.java | 51 ++-- .../indices/recovery/RecoverySettings.java | 254 +++++++++++++++++- .../recovery/RecoverySettingsTests.java | 251 ++++++++++++++++- 5 files changed, 528 insertions(+), 42 deletions(-) create mode 100644 docs/changelog/82819.yaml diff --git a/docs/changelog/82819.yaml b/docs/changelog/82819.yaml new file mode 100644 index 000000000000..64953328e87f --- /dev/null +++ b/docs/changelog/82819.yaml @@ -0,0 +1,6 @@ +pr: 82819 +summary: "[Draft] Adjust `indices.recovery.max_bytes_per_sec` according to external\ + \ settings" +area: Recovery +type: enhancement +issues: [] diff --git a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java index f46de609296d..ffc4da16844a 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java @@ -217,6 +217,14 @@ public void apply(Settings value, Settings current, Settings previous) { RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE, + RecoverySettings.NODE_BANDWIDTH_RECOVERY_FACTOR_READ_SETTING, + RecoverySettings.NODE_BANDWIDTH_RECOVERY_FACTOR_WRITE_SETTING, + RecoverySettings.NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING, + RecoverySettings.NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_READ_SETTING, + RecoverySettings.NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_WRITE_SETTING, + RecoverySettings.NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING, + RecoverySettings.NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING, + RecoverySettings.NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, diff --git a/server/src/main/java/org/elasticsearch/common/settings/Setting.java b/server/src/main/java/org/elasticsearch/common/settings/Setting.java index 60ebf2a3011b..a999cb8e4b98 100644 --- a/server/src/main/java/org/elasticsearch/common/settings/Setting.java +++ b/server/src/main/java/org/elasticsearch/common/settings/Setting.java @@ -1979,28 +1979,35 @@ public static Setting doubleSetting(String key, double defaultValue, dou } public static Setting doubleSetting(String key, double defaultValue, double minValue, double maxValue, Property... properties) { - return new Setting<>(key, (s) -> Double.toString(defaultValue), (s) -> { - final double d = Double.parseDouble(s); - if (d < minValue) { - String err = "Failed to parse value" - + (isFiltered(properties) ? "" : " [" + s + "]") - + " for setting [" - + key - + "] must be >= " - + minValue; - throw new IllegalArgumentException(err); - } - if (d > maxValue) { - String err = "Failed to parse value" - + (isFiltered(properties) ? "" : " [" + s + "]") - + " for setting [" - + key - + "] must be <= " - + maxValue; - throw new IllegalArgumentException(err); - } - return d; - }, properties); + return new Setting<>( + key, + (s) -> Double.toString(defaultValue), + (s) -> parseDouble(s, minValue, maxValue, key, properties), + properties + ); + } + + public static Double parseDouble(String s, double minValue, double maxValue, String key, Property... properties) { + final double d = Double.parseDouble(s); + if (d < minValue) { + String err = "Failed to parse value" + + (isFiltered(properties) ? "" : " [" + s + "]") + + " for setting [" + + key + + "] must be >= " + + minValue; + throw new IllegalArgumentException(err); + } + if (d > maxValue) { + String err = "Failed to parse value" + + (isFiltered(properties) ? "" : " [" + s + "]") + + " for setting [" + + key + + "] must be <= " + + maxValue; + throw new IllegalArgumentException(err); + } + return d; } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index dd306a4d401d..9ee29769ef51 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -10,9 +10,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.store.RateLimiter.SimpleRateLimiter; import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; @@ -35,8 +37,10 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.stream.Collectors; import static org.elasticsearch.common.settings.Setting.parseInt; +import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; public class RecoverySettings { public static final Version SNAPSHOT_RECOVERIES_SUPPORTED_VERSION = Version.V_7_15_0; @@ -66,7 +70,132 @@ public class RecoverySettings { Property.NodeScope ); - public static final ByteSizeValue DEFAULT_MAX_BYTES_PER_SEC = new ByteSizeValue(40L, ByteSizeUnit.MB); + /** + * Disk's write bandwidth allocated for this node. This bandwidth is expressed for write operations that have the default block size of + * {@link #DEFAULT_CHUNK_SIZE}. + */ + public static final Setting NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING = bandwidthSetting( + "node.bandwidth.recovery.disk.write" + ); + + /** + * Disk's read bandwidth allocated for this node. This bandwidth is expressed for read operations that have the default block size of + * {@link #DEFAULT_CHUNK_SIZE}. + */ + public static final Setting NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING = bandwidthSetting( + "node.bandwidth.recovery.disk.read" + ); + + /** + * Network's read bandwidth allocated for this node. + */ + public static final Setting NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING = bandwidthSetting( + "node.bandwidth.recovery.network" + ); + + static final double DEFAULT_FACTOR_VALUE = 0.4d; + + /** + * Default factor as defined by the operator. + */ + public static final Setting NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING = operatorFactorSetting( + "node.bandwidth.recovery.operator.factor", + DEFAULT_FACTOR_VALUE + ); + + public static final Setting NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_WRITE_SETTING = factorSetting( + "node.bandwidth.recovery.operator.factor.write", + NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING + ); + + public static final Setting NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_READ_SETTING = factorSetting( + "node.bandwidth.recovery.operator.factor.read", + NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING + ); + + public static final Setting NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_MAX_OVERCOMMIT_SETTING = Setting.doubleSetting( + "node.bandwidth.recovery.operator.factor.max_overcommit", + 100d, // high default overcommit + 1d, + Double.MAX_VALUE, + Property.NodeScope, + Property.OperatorDynamic + ); + + public static final Setting NODE_BANDWIDTH_RECOVERY_FACTOR_WRITE_SETTING = factorSetting( + "node.bandwidth.recovery.factor.write", + NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_WRITE_SETTING + ); + + public static final Setting NODE_BANDWIDTH_RECOVERY_FACTOR_READ_SETTING = factorSetting( + "node.bandwidth.recovery.factor.read", + NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_READ_SETTING + ); + + static final List> NODE_BANDWIDTH_RECOVERY_SETTINGS = List.of( + NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING, + NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING, + NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING + ); + + /** + * Bandwidth settings have a default value of -1 (meaning that they are undefined) or a value in (0, Long.MAX_VALUE). + */ + private static Setting bandwidthSetting(String key) { + return new Setting<>(key, s -> ByteSizeValue.MINUS_ONE.getStringRep(), s -> { + final ByteSizeValue value = ByteSizeValue.parseBytesSizeValue(s, key); + if (ByteSizeValue.MINUS_ONE.equals(value)) { + return value; + } + if (value.getBytes() <= 0L) { + throw new IllegalArgumentException( + "Failed to parse value [" + + s + + "] for bandwidth setting [" + + key + + "], must be > [" + + ByteSizeValue.ZERO.getStringRep() + + ']' + ); + } + if (value.getBytes() >= Long.MAX_VALUE) { + throw new IllegalArgumentException( + "Failed to parse value [" + + s + + "] for bandwidth setting [" + + key + + "], must be < [" + + new ByteSizeValue(Long.MAX_VALUE, ByteSizeUnit.BYTES).getStringRep() + + ']' + ); + } + return value; + }, Property.NodeScope); + } + + /** + * Operator-defined factors have a value in (0.0, 1.0] + */ + private static Setting operatorFactorSetting(String key, double defaultValue) { + return new Setting<>(key, Double.toString(defaultValue), s -> Setting.parseDouble(s, 0d, 1d, key), v -> { + if (v == 0d) { + throw new IllegalArgumentException("Failed to validate value [" + v + "] for factor setting [" + key + "] must be > [0]"); + } + }, Property.NodeScope, Property.OperatorDynamic); + } + + /** + * User-defined factors have a value in (0.0, 1.0] and fall back to a corresponding operator factor setting. + */ + private static Setting factorSetting(String key, Setting operatorFallback) { + return new Setting<>(key, operatorFallback, s -> Setting.parseDouble(s, 0d, 1d, key), v -> { + if (v == 0d) { + throw new IllegalArgumentException("Failed to validate value [" + v + "] for factor setting [" + key + "] must be > [0]"); + } + }, Property.NodeScope, Property.Dynamic); + } + + static final ByteSizeValue DEFAULT_MAX_BYTES_PER_SEC = new ByteSizeValue(40L, ByteSizeUnit.MB); public static final Setting INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING = Setting.byteSizeSetting( "indices.recovery.max_bytes_per_sec", @@ -277,6 +406,10 @@ public Iterator> settings() { private volatile ByteSizeValue chunkSize = DEFAULT_CHUNK_SIZE; + private final ByteSizeValue availableNetworkBandwidth; + private final ByteSizeValue availableDiskReadBandwidth; + private final ByteSizeValue availableDiskWriteBandwidth; + public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.retryDelayStateSync = INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING.get(settings); this.maxConcurrentFileChunks = INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING.get(settings); @@ -288,22 +421,34 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { this.internalActionTimeout = INDICES_RECOVERY_INTERNAL_ACTION_TIMEOUT_SETTING.get(settings); this.internalActionRetryTimeout = INDICES_RECOVERY_INTERNAL_ACTION_RETRY_TIMEOUT_SETTING.get(settings); this.internalActionLongTimeout = INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING.get(settings); - this.activityTimeout = INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING.get(settings); - this.maxBytesPerSec = INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings); - if (maxBytesPerSec.getBytes() <= 0) { - rateLimiter = null; - } else { - rateLimiter = new SimpleRateLimiter(maxBytesPerSec.getMbFrac()); - } this.useSnapshotsDuringRecovery = INDICES_RECOVERY_USE_SNAPSHOTS_SETTING.get(settings); this.maxConcurrentSnapshotFileDownloads = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS.get(settings); this.maxConcurrentSnapshotFileDownloadsPerNode = INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE.get(settings); this.maxSnapshotFileDownloadsPerNodeSemaphore = new AdjustableSemaphore(this.maxConcurrentSnapshotFileDownloadsPerNode, true); - - logger.debug("using max_bytes_per_sec[{}]", maxBytesPerSec); - - clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, this::setMaxBytesPerSec); + this.availableNetworkBandwidth = NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING.get(settings); + this.availableDiskReadBandwidth = NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING.get(settings); + this.availableDiskWriteBandwidth = NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING.get(settings); + validateNodeBandwidthRecoverySettings(settings); + computeMaxBytesPerSec(settings); + if (DiscoveryNode.canContainData(settings)) { + clusterSettings.addSettingsUpdateConsumer( + this::computeMaxBytesPerSec, + List.of( + INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING, + NODE_BANDWIDTH_RECOVERY_FACTOR_READ_SETTING, + NODE_BANDWIDTH_RECOVERY_FACTOR_WRITE_SETTING, + NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING, + NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_READ_SETTING, + NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_WRITE_SETTING, + // non dynamic settings but they are used to update max bytes per sec + NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING, + NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING, + NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING, + NODE_ROLES_SETTING + ) + ); + } clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, this::setMaxConcurrentFileChunks); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, this::setMaxConcurrentOperations); clusterSettings.addSettingsUpdateConsumer(INDICES_RECOVERY_RETRY_DELAY_STATE_SYNC_SETTING, this::setRetryDelayStateSync); @@ -325,6 +470,75 @@ public RecoverySettings(Settings settings, ClusterSettings clusterSettings) { ); } + private void computeMaxBytesPerSec(Settings settings) { + // limit as computed before 8.1.0 + final long defaultBytesPerSec = Math.max(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.get(settings).getBytes(), 0L); + + // available network bandwidth + final long networkBandwidthBytesPerSec = Math.max(availableNetworkBandwidth.getBytes(), 0L); + + // read bandwidth + final long readBytesPerSec; + if (availableDiskReadBandwidth.getBytes() > 0L && networkBandwidthBytesPerSec > 0L) { + double readFactor = NODE_BANDWIDTH_RECOVERY_FACTOR_READ_SETTING.get(settings); + readBytesPerSec = Math.round(Math.min(availableDiskReadBandwidth.getBytes(), networkBandwidthBytesPerSec) * readFactor); + } else { + readBytesPerSec = 0L; + } + + // write bandwidth + final long writeBytesPerSec; + if (availableDiskWriteBandwidth.getBytes() > 0L && networkBandwidthBytesPerSec > 0L) { + double writeFactor = NODE_BANDWIDTH_RECOVERY_FACTOR_WRITE_SETTING.get(settings); + writeBytesPerSec = Math.round(Math.min(availableDiskWriteBandwidth.getBytes(), networkBandwidthBytesPerSec) * writeFactor); + } else { + writeBytesPerSec = 0L; + } + + final long availableBytesPerSec = Math.min(readBytesPerSec, writeBytesPerSec); + + long maxBytesPerSec; + if (availableBytesPerSec == 0L // no node recovery bandwidths + || INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.exists(settings) // when set this setting overrides node recovery bandwidths + || DiscoveryNode.canContainData(settings) == false) { // keep previous behavior for non data nodes + maxBytesPerSec = defaultBytesPerSec; + } else { + maxBytesPerSec = Math.max(defaultBytesPerSec, availableBytesPerSec); + } + + final long maxAllowedBytesPerSec = Math.round( + Math.max( + Math.min( + Math.min(availableDiskReadBandwidth.getBytes(), availableDiskWriteBandwidth.getBytes()), + networkBandwidthBytesPerSec + ), + 0L + ) * NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_MAX_OVERCOMMIT_SETTING.get(settings) + ); + + ByteSizeValue finalMaxBytesPerSec; + if (maxAllowedBytesPerSec > 0L) { + if (maxBytesPerSec > 0L) { + finalMaxBytesPerSec = ByteSizeValue.ofBytes(Math.min(maxBytesPerSec, maxAllowedBytesPerSec)); + } else { + finalMaxBytesPerSec = ByteSizeValue.ofBytes(maxAllowedBytesPerSec); + } + } else { + finalMaxBytesPerSec = ByteSizeValue.ofBytes(maxBytesPerSec); + } + logger.info( + () -> new ParameterizedMessage( + "using rate limit [{}] with [default={}, read={}, write={}, max={}]", + finalMaxBytesPerSec, + ByteSizeValue.ofBytes(defaultBytesPerSec), + ByteSizeValue.ofBytes(readBytesPerSec), + ByteSizeValue.ofBytes(writeBytesPerSec), + ByteSizeValue.ofBytes(maxAllowedBytesPerSec) + ) + ); + setMaxBytesPerSec(finalMaxBytesPerSec); + } + public RateLimiter rateLimiter() { return rateLimiter; } @@ -458,4 +672,20 @@ Releasable tryAcquireSnapshotDownloadPermits() { return Releasables.releaseOnce(() -> maxSnapshotFileDownloadsPerNodeSemaphore.release(maxConcurrentSnapshotFileDownloads)); } + + private static void validateNodeBandwidthRecoverySettings(Settings settings) { + final List nonDefaults = NODE_BANDWIDTH_RECOVERY_SETTINGS.stream() + .filter(setting -> setting.get(settings) != ByteSizeValue.MINUS_ONE) + .map(Setting::getKey) + .collect(Collectors.toList()); + if (nonDefaults.isEmpty() == false && nonDefaults.size() != NODE_BANDWIDTH_RECOVERY_SETTINGS.size()) { + throw new IllegalArgumentException( + "Settings " + + NODE_BANDWIDTH_RECOVERY_SETTINGS.stream().map(Setting::getKey).collect(Collectors.toList()) + + " must all be defined or all be undefined; but only settings " + + nonDefaults + + " are configured." + ); + } + } } diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java index 6ba323a48769..2ee9ae83cc9d 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoverySettingsTests.java @@ -8,7 +8,9 @@ package org.elasticsearch.indices.recovery; +import org.elasticsearch.cluster.node.DiscoveryNodeRole; import org.elasticsearch.common.settings.ClusterSettings; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; @@ -19,15 +21,24 @@ import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.stream.Collectors; +import static org.elasticsearch.indices.recovery.RecoverySettings.DEFAULT_FACTOR_VALUE; import static org.elasticsearch.indices.recovery.RecoverySettings.DEFAULT_MAX_BYTES_PER_SEC; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_SNAPSHOT_FILE_DOWNLOADS_PER_NODE; import static org.elasticsearch.indices.recovery.RecoverySettings.INDICES_RECOVERY_USE_SNAPSHOTS_SETTING; import static org.elasticsearch.indices.recovery.RecoverySettings.JAVA_VERSION_OVERRIDING_TEST_SETTING; +import static org.elasticsearch.indices.recovery.RecoverySettings.NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING; +import static org.elasticsearch.indices.recovery.RecoverySettings.NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING; +import static org.elasticsearch.indices.recovery.RecoverySettings.NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING; +import static org.elasticsearch.indices.recovery.RecoverySettings.NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_MAX_OVERCOMMIT_SETTING; +import static org.elasticsearch.indices.recovery.RecoverySettings.NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING; +import static org.elasticsearch.indices.recovery.RecoverySettings.NODE_BANDWIDTH_RECOVERY_SETTINGS; import static org.elasticsearch.indices.recovery.RecoverySettings.TOTAL_PHYSICAL_MEMORY_OVERRIDING_TEST_SETTING; import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING; import static org.hamcrest.Matchers.containsString; @@ -105,10 +116,52 @@ public void testMaxConcurrentSnapshotFileDownloadsPerNodeIsValidated() { ); } + public void testAvailableBandwidthsSettingsAreAllConfigured() { + final NodeRecoverySettings recoverySettings = nodeRecoverySettings(); + recoverySettings.withRandomIndicesRecoveryMaxBytesPerSec(); + recoverySettings.withRoles(randomDataNodeRoles()); + recoverySettings.withRandomMemory(); + + final List> randomSettings = randomSubsetOf( + randomIntBetween(1, NODE_BANDWIDTH_RECOVERY_SETTINGS.size() - 1), + NODE_BANDWIDTH_RECOVERY_SETTINGS + ); + for (Setting setting : randomSettings) { + if (setting.getKey().equals(NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING.getKey())) { + recoverySettings.withNetworkBandwidth(randomNonZeroByteSizeValue()); + } else if (setting.getKey().equals(NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING.getKey())) { + recoverySettings.withDiskReadBandwidth(randomNonZeroByteSizeValue()); + } else if (setting.getKey().equals(NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING.getKey())) { + recoverySettings.withDiskWriteBandwidth(randomNonZeroByteSizeValue()); + } else { + throw new AssertionError(); + } + } + + final IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, recoverySettings::build); + assertThat( + exception.getMessage(), + containsString( + "Settings " + + NODE_BANDWIDTH_RECOVERY_SETTINGS.stream().map(Setting::getKey).collect(Collectors.toList()) + + " must all be defined or all be undefined; but only settings " + + NODE_BANDWIDTH_RECOVERY_SETTINGS.stream() + .filter(randomSettings::contains) + .map(Setting::getKey) + .collect(Collectors.toList()) + + " are configured." + ) + ); + } + public void testDefaultMaxBytesPerSecOnNonDataNode() { assertThat( "Non-data nodes have a default 40mb rate limit", - nodeRecoverySettings().withRole(randomFrom("master", "ingest", "ml")).withRandomMemory().build().getMaxBytesPerSec(), + nodeRecoverySettings().withRole(randomFrom("master", "ingest", "ml")) + .withRandomBandwidths() + .withRandomMemory() + .build() + .getMaxBytesPerSec(), equalTo(DEFAULT_MAX_BYTES_PER_SEC) ); } @@ -138,13 +191,11 @@ public void testDefaultMaxBytesPerSecOnDataNode() { } public void testMaxBytesPerSecOnDataNodeWithIndicesRecoveryMaxBytesPerSec() { - final Set roles = new HashSet<>(randomSubsetOf(randomIntBetween(1, 4), "data", "data_hot", "data_warm", "data_content")); - roles.addAll(randomSubsetOf(Set.of("data_cold", "data_frozen"))); final ByteSizeValue random = randomByteSizeValue(); assertThat( "Data nodes that are not dedicated to cold/frozen should use the defined rate limit when set", - nodeRecoverySettings().withRoles(roles) - .withIndicesRecoveryMaxBytesPerSec(random) + nodeRecoverySettings().withIndicesRecoveryMaxBytesPerSec(random) + .withRoles(randomDataNodeRoles()) .withRandomMemory() .build() .getMaxBytesPerSec(), @@ -152,6 +203,111 @@ public void testMaxBytesPerSecOnDataNodeWithIndicesRecoveryMaxBytesPerSec() { ); } + public void testMaxBytesPerSecOnDataNodeWithIndicesRecoveryMaxBytesPerSecAndOvercommit() { + final Double maxOvercommitFactor = randomBoolean() ? randomDoubleBetween(1.0d, 100.0d, true) : null; + final ByteSizeValue indicesRecoveryMaxBytesPerSec = switch (randomInt(2)) { + case 0 -> ByteSizeValue.MINUS_ONE; + case 1 -> ByteSizeValue.ZERO; + case 2 -> ByteSizeValue.ofGb(between(100, 1000)); + default -> throw new AssertionError(); + }; + assertThat( + "Data nodes should not exceed the max. allowed overcommit when 'indices.recovery.max_bytes_per_sec' is too large", + nodeRecoverySettings().withIndicesRecoveryMaxBytesPerSec(indicesRecoveryMaxBytesPerSec) + .withNetworkBandwidth(ByteSizeValue.ofGb(1)) + .withDiskReadBandwidth(ByteSizeValue.ofMb(500)) + .withDiskWriteBandwidth(ByteSizeValue.ofMb(250)) + .withMaxOvercommitFactor(maxOvercommitFactor) + .withRoles(randomDataNodeRoles()) + .withRandomMemory() + .build() + .getMaxBytesPerSec(), + equalTo( + ByteSizeValue.ofBytes( + Math.round(Objects.requireNonNullElse(maxOvercommitFactor, 100.d) * ByteSizeValue.ofMb(250).getBytes()) + ) + ) + ); + } + + public void testMaxBytesPerSecOnDataNodeWithAvailableBandwidths() { + assertThat( + "Data node should use pre 8.1.0 default because available bandwidths are lower", + nodeRecoverySettings().withRoles(randomDataNodeRoles()) + .withRandomMemory() + .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10))) + .withDiskReadBandwidth(ByteSizeValue.ofMb(between(10, 50))) + .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(10, 50))) + .build() + .getMaxBytesPerSec(), + equalTo(DEFAULT_MAX_BYTES_PER_SEC) + ); + + final ByteSizeValue indicesRecoveryMaxBytesPerSec = ByteSizeValue.ofMb(randomFrom(100, 250)); + assertThat( + "Data node should use 'indices.recovery.max_bytes_per_sec' setting because available bandwidths are lower", + nodeRecoverySettings().withRoles(randomDataNodeRoles()) + .withRandomMemory() + .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10))) + .withDiskReadBandwidth(ByteSizeValue.ofMb(between(10, 50))) + .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(10, 50))) + .withIndicesRecoveryMaxBytesPerSec(indicesRecoveryMaxBytesPerSec) + .build() + .getMaxBytesPerSec(), + equalTo(indicesRecoveryMaxBytesPerSec) + ); + + final Double factor = randomBoolean() ? randomDoubleBetween(0.5d, 1.0d, true) : null; + + final ByteSizeValue networkBandwidth = ByteSizeValue.ofMb(randomFrom(100, 250)); + assertThat( + "Data node should use available disk read bandwidth", + nodeRecoverySettings().withRoles(randomDataNodeRoles()) + .withRandomMemory() + .withNetworkBandwidth(networkBandwidth) + .withDiskReadBandwidth(ByteSizeValue.ofMb(between(250, 500))) + .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(250, 500))) + .withOperatorDefaultFactor(factor) + .build() + .getMaxBytesPerSec(), + equalTo( + ByteSizeValue.ofBytes(Math.round(Objects.requireNonNullElse(factor, DEFAULT_FACTOR_VALUE) * networkBandwidth.getBytes())) + ) + ); + + final ByteSizeValue diskReadBandwidth = ByteSizeValue.ofMb(randomFrom(100, 250)); + assertThat( + "Data node should use available disk read bandwidth", + nodeRecoverySettings().withRoles(randomDataNodeRoles()) + .withRandomMemory() + .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10))) + .withDiskReadBandwidth(diskReadBandwidth) + .withDiskWriteBandwidth(ByteSizeValue.ofMb(between(250, 500))) + .withOperatorDefaultFactor(factor) + .build() + .getMaxBytesPerSec(), + equalTo( + ByteSizeValue.ofBytes(Math.round(Objects.requireNonNullElse(factor, DEFAULT_FACTOR_VALUE) * diskReadBandwidth.getBytes())) + ) + ); + + final ByteSizeValue diskWriteBandwidth = ByteSizeValue.ofMb(randomFrom(100, 250)); + assertThat( + "Data node should use available disk write bandwidth", + nodeRecoverySettings().withRoles(randomDataNodeRoles()) + .withRandomMemory() + .withNetworkBandwidth(ByteSizeValue.ofGb(between(1, 10))) + .withDiskReadBandwidth(ByteSizeValue.ofMb(between(250, 500))) + .withDiskWriteBandwidth(diskWriteBandwidth) + .withOperatorDefaultFactor(factor) + .build() + .getMaxBytesPerSec(), + equalTo( + ByteSizeValue.ofBytes(Math.round(Objects.requireNonNullElse(factor, DEFAULT_FACTOR_VALUE) * diskWriteBandwidth.getBytes())) + ) + ); + } + public void testDefaultMaxBytesPerSecOnColdOrFrozenNodeWithOldJvm() { assertThat( "Data nodes with only cold/frozen data roles have a default 40mb rate limit on Java version prior to 14", @@ -238,15 +394,33 @@ public void testMaxBytesPerSecOnColdOrFrozenNodeWithIndicesRecoveryMaxBytesPerSe ); } - public static ByteSizeValue randomByteSizeValue() { + private static ByteSizeValue randomByteSizeValue() { return new ByteSizeValue(randomLongBetween(0L, Long.MAX_VALUE >> 16)); } - public static ByteSizeValue randomNonZeroByteSizeValue() { + private static ByteSizeValue randomNonZeroByteSizeValue() { return new ByteSizeValue(randomLongBetween(1L, Long.MAX_VALUE >> 16)); } - static NodeRecoverySettings nodeRecoverySettings() { + private static Set randomDataNodeRoles() { + final Set roles = new HashSet<>(randomSubsetOf(randomIntBetween(1, 4), "data", "data_hot", "data_warm", "data_content")); + roles.addAll(randomSubsetOf(Set.of("data_cold", "data_frozen"))); + if (randomBoolean()) { + roles.addAll( + randomSubsetOf( + DiscoveryNodeRole.roles() + .stream() + .filter(role -> role != DiscoveryNodeRole.VOTING_ONLY_NODE_ROLE) + .filter(role -> role.canContainData() == false) + .map(DiscoveryNodeRole::roleName) + .collect(Collectors.toSet()) + ) + ); + } + return roles; + } + + private static NodeRecoverySettings nodeRecoverySettings() { return new NodeRecoverySettings(); } @@ -255,7 +429,12 @@ private static class NodeRecoverySettings { private Set roles; private ByteSizeValue physicalMemory; private @Nullable String javaVersion; + private @Nullable ByteSizeValue networkBandwidth; + private @Nullable ByteSizeValue diskReadBandwidth; + private @Nullable ByteSizeValue diskWriteBandwidth; private @Nullable ByteSizeValue indicesRecoveryMaxBytesPerSec; + private @Nullable Double operatorDefaultFactor; + private @Nullable Double maxOvercommitFactor; NodeRecoverySettings withRole(String role) { this.roles = Set.of(Objects.requireNonNull(role)); @@ -286,6 +465,47 @@ NodeRecoverySettings withIndicesRecoveryMaxBytesPerSec(ByteSizeValue indicesReco return this; } + NodeRecoverySettings withRandomIndicesRecoveryMaxBytesPerSec() { + if (randomBoolean()) { + withIndicesRecoveryMaxBytesPerSec(randomByteSizeValue()); + } + return this; + } + + NodeRecoverySettings withNetworkBandwidth(ByteSizeValue networkBandwidth) { + this.networkBandwidth = networkBandwidth; + return this; + } + + NodeRecoverySettings withDiskReadBandwidth(ByteSizeValue diskReadBandwidth) { + this.diskReadBandwidth = diskReadBandwidth; + return this; + } + + NodeRecoverySettings withDiskWriteBandwidth(ByteSizeValue diskWriteBandwidth) { + this.diskWriteBandwidth = diskWriteBandwidth; + return this; + } + + NodeRecoverySettings withRandomBandwidths() { + if (randomBoolean()) { + withNetworkBandwidth(randomNonZeroByteSizeValue()); + withDiskReadBandwidth(randomNonZeroByteSizeValue()); + withDiskWriteBandwidth(randomNonZeroByteSizeValue()); + } + return this; + } + + NodeRecoverySettings withOperatorDefaultFactor(Double factor) { + this.operatorDefaultFactor = factor; + return this; + } + + NodeRecoverySettings withMaxOvercommitFactor(Double factor) { + this.maxOvercommitFactor = factor; + return this; + } + RecoverySettings build() { final Settings.Builder settings = Settings.builder(); settings.put(TOTAL_PHYSICAL_MEMORY_OVERRIDING_TEST_SETTING.getKey(), Objects.requireNonNull(physicalMemory)); @@ -298,6 +518,21 @@ RecoverySettings build() { if (indicesRecoveryMaxBytesPerSec != null) { settings.put(INDICES_RECOVERY_MAX_BYTES_PER_SEC_SETTING.getKey(), indicesRecoveryMaxBytesPerSec); } + if (networkBandwidth != null) { + settings.put(NODE_BANDWIDTH_RECOVERY_NETWORK_SETTING.getKey(), networkBandwidth); + } + if (diskReadBandwidth != null) { + settings.put(NODE_BANDWIDTH_RECOVERY_DISK_READ_SETTING.getKey(), diskReadBandwidth); + } + if (diskWriteBandwidth != null) { + settings.put(NODE_BANDWIDTH_RECOVERY_DISK_WRITE_SETTING.getKey(), diskWriteBandwidth); + } + if (operatorDefaultFactor != null) { + settings.put(NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_SETTING.getKey(), operatorDefaultFactor); + } + if (maxOvercommitFactor != null) { + settings.put(NODE_BANDWIDTH_RECOVERY_OPERATOR_FACTOR_MAX_OVERCOMMIT_SETTING.getKey(), maxOvercommitFactor); + } return new RecoverySettings(settings.build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); } }