From ca40ba4646b3b5298c2c9e6e652df596048468e5 Mon Sep 17 00:00:00 2001 From: Rahul Karajgikar <50844303+rahulkarajgikar@users.noreply.github.com> Date: Wed, 23 Oct 2024 08:51:21 +0530 Subject: [PATCH] Make multiple settings dynamic for tuning on larger clusters (#16347) Signed-off-by: Rahul Karajgikar --- CHANGELOG.md | 1 + .../cluster/coordination/Coordinator.java | 12 +- .../ElectionSchedulerFactory.java | 2 +- .../coordination/FollowersChecker.java | 12 +- .../cluster/coordination/LagDetector.java | 12 +- .../gateway/ShardsBatchGatewayAllocator.java | 11 +- .../CoordinationCheckerSettingsTests.java | 105 +++++++++++++++++- .../coordination/LagDetectorTests.java | 6 +- .../ShardsBatchGatewayAllocatorTests.java | 66 +++++++++++ 9 files changed, 212 insertions(+), 15 deletions(-) create mode 100644 server/src/test/java/org/opensearch/gateway/ShardsBatchGatewayAllocatorTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index c1485de3de2ee..0f95cb2484984 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -67,6 +67,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Code cleanup: Remove ApproximateIndexOrDocValuesQuery ([#16273](https://github.com/opensearch-project/OpenSearch/pull/16273)) - Optimise clone operation for incremental full cluster snapshots ([#16296](https://github.com/opensearch-project/OpenSearch/pull/16296)) - Update last seen cluster state in the commit phase ([#16215](https://github.com/opensearch-project/OpenSearch/pull/16215)) +- Make multiple settings dynamic for tuning on larger clusters([#16347](https://github.com/opensearch-project/OpenSearch/pull/16347)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 684a6b0c3eae5..6fee2037501e7 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -141,7 +141,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery "cluster.publish.timeout", TimeValue.timeValueMillis(30000), TimeValue.timeValueMillis(1), - Setting.Property.NodeScope + Setting.Property.NodeScope, + Setting.Property.Dynamic ); private final Settings settings; @@ -164,7 +165,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery private final Random random; private final ElectionSchedulerFactory electionSchedulerFactory; private final SeedHostsResolver configuredHostsResolver; - private final TimeValue publishTimeout; + private TimeValue publishTimeout; private final TimeValue publishInfoTimeout; private final PublicationTransportHandler publicationHandler; private final LeaderChecker leaderChecker; @@ -247,6 +248,7 @@ public Coordinator( this.lastJoin = Optional.empty(); this.joinAccumulator = new InitialJoinAccumulator(); this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(PUBLISH_TIMEOUT_SETTING, this::setPublishTimeout); this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings); this.random = random; this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool()); @@ -301,6 +303,7 @@ public Coordinator( ); this.lagDetector = new LagDetector( settings, + clusterSettings, transportService.getThreadPool(), n -> removeNode(n, "lagging"), transportService::getLocalNode @@ -319,6 +322,10 @@ public Coordinator( this.clusterSettings = clusterSettings; } + private void setPublishTimeout(TimeValue publishTimeout) { + this.publishTimeout = publishTimeout; + } + private ClusterFormationState getClusterFormationState() { return new ClusterFormationState( settings, @@ -1669,7 +1676,6 @@ public void onNodeAck(DiscoveryNode node, Exception e) { this.localNodeAckEvent = localNodeAckEvent; this.ackListener = ackListener; this.publishListener = publishListener; - this.timeoutHandler = singleNodeDiscovery ? null : transportService.getThreadPool().schedule(new Runnable() { @Override public void run() { diff --git a/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java b/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java index 828db5864d28b..1cc88c71c609b 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/ElectionSchedulerFactory.java @@ -214,7 +214,7 @@ protected void doRun() { if (isClosed.get()) { logger.debug("{} not starting election", this); } else { - logger.debug("{} starting election", this); + logger.debug("{} starting election with duration {}", this, duration); scheduleNextElection(duration, scheduledRunnable); scheduledRunnable.run(); } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java index 2ec0dabd91786..ca414ef7c4fc8 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/FollowersChecker.java @@ -92,7 +92,8 @@ public class FollowersChecker { "cluster.fault_detection.follower_check.interval", TimeValue.timeValueMillis(1000), TimeValue.timeValueMillis(100), - Setting.Property.NodeScope + Setting.Property.NodeScope, + Setting.Property.Dynamic ); // the timeout for each check sent to each node @@ -100,7 +101,7 @@ public class FollowersChecker { "cluster.fault_detection.follower_check.timeout", TimeValue.timeValueMillis(10000), TimeValue.timeValueMillis(1), - TimeValue.timeValueMillis(60000), + TimeValue.timeValueMillis(150000), Setting.Property.NodeScope, Setting.Property.Dynamic ); @@ -115,7 +116,7 @@ public class FollowersChecker { private final Settings settings; - private final TimeValue followerCheckInterval; + private TimeValue followerCheckInterval; private TimeValue followerCheckTimeout; private final int followerCheckRetryCount; private final BiConsumer onNodeFailure; @@ -148,6 +149,7 @@ public FollowersChecker( followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings); followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings); followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_INTERVAL_SETTING, this::setFollowerCheckInterval); clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_TIMEOUT_SETTING, this::setFollowerCheckTimeout); updateFastResponseState(0, Mode.CANDIDATE); transportService.registerRequestHandler( @@ -167,6 +169,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti this.clusterManagerMetrics = clusterManagerMetrics; } + private void setFollowerCheckInterval(TimeValue followerCheckInterval) { + this.followerCheckInterval = followerCheckInterval; + } + private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) { this.followerCheckTimeout = followerCheckTimeout; } diff --git a/server/src/main/java/org/opensearch/cluster/coordination/LagDetector.java b/server/src/main/java/org/opensearch/cluster/coordination/LagDetector.java index eeb0800663d0a..969c121dc87cf 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/LagDetector.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/LagDetector.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; @@ -68,10 +69,11 @@ public class LagDetector { "cluster.follower_lag.timeout", TimeValue.timeValueMillis(90000), TimeValue.timeValueMillis(1), - Setting.Property.NodeScope + Setting.Property.NodeScope, + Setting.Property.Dynamic ); - private final TimeValue clusterStateApplicationTimeout; + private TimeValue clusterStateApplicationTimeout; private final Consumer onLagDetected; private final Supplier localNodeSupplier; private final ThreadPool threadPool; @@ -79,12 +81,14 @@ public class LagDetector { public LagDetector( final Settings settings, + final ClusterSettings clusterSettings, final ThreadPool threadPool, final Consumer onLagDetected, final Supplier localNodeSupplier ) { this.threadPool = threadPool; this.clusterStateApplicationTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING, this::setFollowerLagTimeout); this.onLagDetected = onLagDetected; this.localNodeSupplier = localNodeSupplier; } @@ -136,6 +140,10 @@ public String toString() { } } + private void setFollowerLagTimeout(TimeValue followerCheckLagTimeout) { + this.clusterStateApplicationTimeout = followerCheckLagTimeout; + } + @Override public String toString() { return "LagDetector{" diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index d7c0a66ba3424..9c38ea1df8a41 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -72,7 +72,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { public static final String ALLOCATOR_NAME = "shards_batch_gateway_allocator"; private static final Logger logger = LogManager.getLogger(ShardsBatchGatewayAllocator.class); - private final long maxBatchSize; + private long maxBatchSize; private static final short DEFAULT_SHARD_BATCH_SIZE = 2000; public static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY = @@ -93,7 +93,8 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { DEFAULT_SHARD_BATCH_SIZE, 1, 10000, - Setting.Property.NodeScope + Setting.Property.NodeScope, + Setting.Property.Dynamic ); /** @@ -172,6 +173,7 @@ public ShardsBatchGatewayAllocator( this.batchStartedAction = batchStartedAction; this.batchStoreAction = batchStoreAction; this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings); + clusterSettings.addSettingsUpdateConsumer(GATEWAY_ALLOCATOR_BATCH_SIZE, this::setMaxBatchSize); this.primaryShardsBatchGatewayAllocatorTimeout = PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings); clusterSettings.addSettingsUpdateConsumer(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setPrimaryBatchAllocatorTimeout); this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings); @@ -402,6 +404,7 @@ else if (shardRouting.primary() == primary) { Iterator iterator = newShardsToBatch.values().iterator(); assert maxBatchSize > 0 : "Shards batch size must be greater than 0"; + logger.debug("Using async fetch batch size {}", maxBatchSize); long batchSize = maxBatchSize; Map perBatchShards = new HashMap<>(); while (iterator.hasNext()) { @@ -906,6 +909,10 @@ public int getNumberOfStoreShardBatches() { return batchIdToStoreShardBatch.size(); } + private void setMaxBatchSize(long maxBatchSize) { + this.maxBatchSize = maxBatchSize; + } + protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatewayAllocatorTimeout) { this.primaryShardsBatchGatewayAllocatorTimeout = primaryShardsBatchGatewayAllocatorTimeout; } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationCheckerSettingsTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationCheckerSettingsTests.java index 56bd2d94dce84..8e8e71ad33e75 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinationCheckerSettingsTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinationCheckerSettingsTests.java @@ -14,7 +14,10 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import static org.opensearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING; +import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING; import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING; +import static org.opensearch.cluster.coordination.LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING; import static org.opensearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING; import static org.opensearch.common.unit.TimeValue.timeValueSeconds; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -42,10 +45,10 @@ public void testFollowerCheckTimeoutValueUpdate() { public void testFollowerCheckTimeoutMaxValue() { Setting setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING; - Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "61s").build(); + Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "151s").build(); assertThrows( - "failed to parse value [61s] for setting [" + setting1.getKey() + "], must be <= [60000ms]", + "failed to parse value [151s] for setting [" + setting1.getKey() + "], must be <= [150000ms]", IllegalArgumentException.class, () -> { client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet(); @@ -66,6 +69,38 @@ public void testFollowerCheckTimeoutMinValue() { ); } + public void testFollowerCheckIntervalValueUpdate() { + Setting setting1 = FOLLOWER_CHECK_INTERVAL_SETTING; + Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "10s").build(); + try { + ClusterUpdateSettingsResponse response = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(timeSettings1) + .execute() + .actionGet(); + assertAcked(response); + assertEquals(timeValueSeconds(10), setting1.get(response.getPersistentSettings())); + } finally { + // cleanup + timeSettings1 = Settings.builder().putNull(setting1.getKey()).build(); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet(); + } + } + + public void testFollowerCheckIntervalMinValue() { + Setting setting1 = FOLLOWER_CHECK_INTERVAL_SETTING; + Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "10ms").build(); + + assertThrows( + "failed to parse value [10ms] for setting [" + setting1.getKey() + "], must be >= [100ms]", + IllegalArgumentException.class, + () -> { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet(); + } + ); + } + public void testLeaderCheckTimeoutValueUpdate() { Setting setting1 = LEADER_CHECK_TIMEOUT_SETTING; Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build(); @@ -110,4 +145,70 @@ public void testLeaderCheckTimeoutMinValue() { } ); } + + public void testClusterPublishTimeoutValueUpdate() { + Setting setting1 = PUBLISH_TIMEOUT_SETTING; + Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build(); + try { + ClusterUpdateSettingsResponse response = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(timeSettings1) + .execute() + .actionGet(); + assertAcked(response); + assertEquals(timeValueSeconds(60), setting1.get(response.getPersistentSettings())); + } finally { + // cleanup + timeSettings1 = Settings.builder().putNull(setting1.getKey()).build(); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet(); + } + } + + public void testClusterPublishTimeoutMinValue() { + Setting setting1 = PUBLISH_TIMEOUT_SETTING; + Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "0s").build(); + + assertThrows( + "failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]", + IllegalArgumentException.class, + () -> { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet(); + } + ); + } + + public void testLagDetectorTimeoutUpdate() { + Setting setting1 = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING; + Settings lagDetectorTimeout = Settings.builder().put(setting1.getKey(), "30s").build(); + try { + ClusterUpdateSettingsResponse response = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(lagDetectorTimeout) + .execute() + .actionGet(); + + assertAcked(response); + assertEquals(timeValueSeconds(30), setting1.get(response.getPersistentSettings())); + } finally { + // cleanup + lagDetectorTimeout = Settings.builder().putNull(setting1.getKey()).build(); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(lagDetectorTimeout).execute().actionGet(); + } + } + + public void testLagDetectorTimeoutMinValue() { + Setting setting1 = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING; + Settings lagDetectorTimeout = Settings.builder().put(setting1.getKey(), "0s").build(); + + assertThrows( + "failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]", + IllegalArgumentException.class, + () -> { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(lagDetectorTimeout).execute().actionGet(); + } + ); + } + } diff --git a/server/src/test/java/org/opensearch/cluster/coordination/LagDetectorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/LagDetectorTests.java index adffa27e9bc1a..315e5d6224227 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/LagDetectorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/LagDetectorTests.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.coordination; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.test.OpenSearchTestCase; @@ -70,8 +71,9 @@ public void setupFixture() { } else { followerLagTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(Settings.EMPTY); } - - lagDetector = new LagDetector(settingsBuilder.build(), deterministicTaskQueue.getThreadPool(), failedNodes::add, () -> localNode); + Settings settings = settingsBuilder.build(); + final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + lagDetector = new LagDetector(settings, clusterSettings, deterministicTaskQueue.getThreadPool(), failedNodes::add, () -> localNode); localNode = CoordinationStateTests.createNode("local"); node1 = CoordinationStateTests.createNode("node1"); diff --git a/server/src/test/java/org/opensearch/gateway/ShardsBatchGatewayAllocatorTests.java b/server/src/test/java/org/opensearch/gateway/ShardsBatchGatewayAllocatorTests.java new file mode 100644 index 0000000000000..59fb6e2b940ba --- /dev/null +++ b/server/src/test/java/org/opensearch/gateway/ShardsBatchGatewayAllocatorTests.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.gateway; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchSingleNodeTestCase; + +import static org.opensearch.gateway.ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +public class ShardsBatchGatewayAllocatorTests extends OpenSearchSingleNodeTestCase { + public void testBatchSizeValueUpdate() { + Setting setting1 = GATEWAY_ALLOCATOR_BATCH_SIZE; + Settings batchSizeSetting = Settings.builder().put(setting1.getKey(), "3000").build(); + try { + ClusterUpdateSettingsResponse response = client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(batchSizeSetting) + .execute() + .actionGet(); + + assertAcked(response); + assertThat(setting1.get(response.getPersistentSettings()), equalTo(3000L)); + } finally { + // cleanup + batchSizeSetting = Settings.builder().putNull(setting1.getKey()).build(); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(batchSizeSetting).execute().actionGet(); + } + } + + public void testBatchSizeMaxValue() { + Setting setting1 = GATEWAY_ALLOCATOR_BATCH_SIZE; + Settings batchSizeSetting = Settings.builder().put(setting1.getKey(), "11000").build(); + + assertThrows( + "failed to parse value [11000] for setting [" + setting1.getKey() + "], must be <= [10000]", + IllegalArgumentException.class, + () -> { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(batchSizeSetting).execute().actionGet(); + } + ); + } + + public void testBatchSizeMinValue() { + Setting setting1 = GATEWAY_ALLOCATOR_BATCH_SIZE; + Settings batchSizeSetting = Settings.builder().put(setting1.getKey(), "0").build(); + + assertThrows( + "failed to parse value [0] for setting [" + setting1.getKey() + "], must be >= [1]", + IllegalArgumentException.class, + () -> { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(batchSizeSetting).execute().actionGet(); + } + ); + } +}