Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make multiple settings dynamic for tuning on larger clusters #16347

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Code cleanup: Remove ApproximateIndexOrDocValuesQuery ([#16273](https://github.com/opensearch-project/OpenSearch/pull/16273))
- Optimise clone operation for incremental full cluster snapshots ([#16296](https://github.com/opensearch-project/OpenSearch/pull/16296))
- Update last seen cluster state in the commit phase ([#16215](https://github.com/opensearch-project/OpenSearch/pull/16215))
- Make multiple settings dynamic for tuning on larger clusters([#16347](https://github.com/opensearch-project/OpenSearch/pull/16347))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
"cluster.publish.timeout",
TimeValue.timeValueMillis(30000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final Settings settings;
Expand All @@ -164,7 +165,7 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private final Random random;
private final ElectionSchedulerFactory electionSchedulerFactory;
private final SeedHostsResolver configuredHostsResolver;
private final TimeValue publishTimeout;
private TimeValue publishTimeout;
private final TimeValue publishInfoTimeout;
private final PublicationTransportHandler publicationHandler;
private final LeaderChecker leaderChecker;
Expand Down Expand Up @@ -246,6 +247,7 @@ public Coordinator(
this.lastJoin = Optional.empty();
this.joinAccumulator = new InitialJoinAccumulator();
this.publishTimeout = PUBLISH_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(PUBLISH_TIMEOUT_SETTING, this::setPublishTimeout);
this.publishInfoTimeout = PUBLISH_INFO_TIMEOUT_SETTING.get(settings);
this.random = random;
this.electionSchedulerFactory = new ElectionSchedulerFactory(settings, random, transportService.getThreadPool());
Expand Down Expand Up @@ -300,6 +302,7 @@ public Coordinator(
);
this.lagDetector = new LagDetector(
settings,
clusterSettings,
transportService.getThreadPool(),
n -> removeNode(n, "lagging"),
transportService::getLocalNode
Expand All @@ -317,6 +320,10 @@ public Coordinator(
this.remoteClusterStateService = remoteClusterStateService;
}

private void setPublishTimeout(TimeValue publishTimeout) {
this.publishTimeout = publishTimeout;
}

private ClusterFormationState getClusterFormationState() {
return new ClusterFormationState(
settings,
Expand Down Expand Up @@ -1667,7 +1674,6 @@ public void onNodeAck(DiscoveryNode node, Exception e) {
this.localNodeAckEvent = localNodeAckEvent;
this.ackListener = ackListener;
this.publishListener = publishListener;

this.timeoutHandler = singleNodeDiscovery ? null : transportService.getThreadPool().schedule(new Runnable() {
@Override
public void run() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ protected void doRun() {
if (isClosed.get()) {
logger.debug("{} not starting election", this);
} else {
logger.debug("{} starting election", this);
logger.debug("{} starting election with duration {}", this, duration);
scheduleNextElection(duration, scheduledRunnable);
scheduledRunnable.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,16 @@ public class FollowersChecker {
"cluster.fault_detection.follower_check.interval",
TimeValue.timeValueMillis(1000),
TimeValue.timeValueMillis(100),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

// the timeout for each check sent to each node
public static final Setting<TimeValue> FOLLOWER_CHECK_TIMEOUT_SETTING = Setting.timeSetting(
"cluster.fault_detection.follower_check.timeout",
TimeValue.timeValueMillis(10000),
TimeValue.timeValueMillis(1),
TimeValue.timeValueMillis(60000),
TimeValue.timeValueMillis(150000),
shwetathareja marked this conversation as resolved.
Show resolved Hide resolved
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Expand All @@ -115,7 +116,7 @@ public class FollowersChecker {

private final Settings settings;

private final TimeValue followerCheckInterval;
private TimeValue followerCheckInterval;
private TimeValue followerCheckTimeout;
private final int followerCheckRetryCount;
private final BiConsumer<DiscoveryNode, String> onNodeFailure;
Expand Down Expand Up @@ -148,6 +149,7 @@ public FollowersChecker(
followerCheckInterval = FOLLOWER_CHECK_INTERVAL_SETTING.get(settings);
followerCheckTimeout = FOLLOWER_CHECK_TIMEOUT_SETTING.get(settings);
followerCheckRetryCount = FOLLOWER_CHECK_RETRY_COUNT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_INTERVAL_SETTING, this::setFollowerCheckInterval);
clusterSettings.addSettingsUpdateConsumer(FOLLOWER_CHECK_TIMEOUT_SETTING, this::setFollowerCheckTimeout);
updateFastResponseState(0, Mode.CANDIDATE);
transportService.registerRequestHandler(
Expand All @@ -167,6 +169,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
this.clusterManagerMetrics = clusterManagerMetrics;
}

private void setFollowerCheckInterval(TimeValue followerCheckInterval) {
this.followerCheckInterval = followerCheckInterval;
}

private void setFollowerCheckTimeout(TimeValue followerCheckTimeout) {
this.followerCheckTimeout = followerCheckTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -68,23 +69,26 @@ public class LagDetector {
"cluster.follower_lag.timeout",
TimeValue.timeValueMillis(90000),
TimeValue.timeValueMillis(1),
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private final TimeValue clusterStateApplicationTimeout;
private TimeValue clusterStateApplicationTimeout;
private final Consumer<DiscoveryNode> onLagDetected;
private final Supplier<DiscoveryNode> localNodeSupplier;
private final ThreadPool threadPool;
private final Map<DiscoveryNode, NodeAppliedStateTracker> appliedStateTrackersByNode = newConcurrentMap();

public LagDetector(
final Settings settings,
final ClusterSettings clusterSettings,
final ThreadPool threadPool,
final Consumer<DiscoveryNode> onLagDetected,
final Supplier<DiscoveryNode> localNodeSupplier
) {
this.threadPool = threadPool;
this.clusterStateApplicationTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING, this::setFollowerLagTimeout);
this.onLagDetected = onLagDetected;
this.localNodeSupplier = localNodeSupplier;
}
Expand Down Expand Up @@ -136,6 +140,10 @@ public String toString() {
}
}

private void setFollowerLagTimeout(TimeValue followerCheckLagTimeout) {
this.clusterStateApplicationTimeout = followerCheckLagTimeout;
}

@Override
public String toString() {
return "LagDetector{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {

public static final String ALLOCATOR_NAME = "shards_batch_gateway_allocator";
private static final Logger logger = LogManager.getLogger(ShardsBatchGatewayAllocator.class);
private final long maxBatchSize;
private long maxBatchSize;
private static final short DEFAULT_SHARD_BATCH_SIZE = 2000;

public static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY =
Expand All @@ -93,7 +93,8 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator {
DEFAULT_SHARD_BATCH_SIZE,
1,
10000,
Setting.Property.NodeScope
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/**
Expand Down Expand Up @@ -172,6 +173,7 @@ public ShardsBatchGatewayAllocator(
this.batchStartedAction = batchStartedAction;
this.batchStoreAction = batchStoreAction;
this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings);
clusterSettings.addSettingsUpdateConsumer(GATEWAY_ALLOCATOR_BATCH_SIZE, this::setMaxBatchSize);
this.primaryShardsBatchGatewayAllocatorTimeout = PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING, this::setPrimaryBatchAllocatorTimeout);
this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings);
Expand Down Expand Up @@ -402,6 +404,7 @@ else if (shardRouting.primary() == primary) {
Iterator<ShardRouting> iterator = newShardsToBatch.values().iterator();
assert maxBatchSize > 0 : "Shards batch size must be greater than 0";

logger.debug("Using async fetch batch size {}", maxBatchSize);
long batchSize = maxBatchSize;
Map<ShardId, ShardEntry> perBatchShards = new HashMap<>();
while (iterator.hasNext()) {
Expand Down Expand Up @@ -906,6 +909,10 @@ public int getNumberOfStoreShardBatches() {
return batchIdToStoreShardBatch.size();
}

private void setMaxBatchSize(long maxBatchSize) {
this.maxBatchSize = maxBatchSize;
}

protected void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatewayAllocatorTimeout) {
this.primaryShardsBatchGatewayAllocatorTimeout = primaryShardsBatchGatewayAllocatorTimeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.OpenSearchSingleNodeTestCase;

import static org.opensearch.cluster.coordination.Coordinator.PUBLISH_TIMEOUT_SETTING;
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_INTERVAL_SETTING;
import static org.opensearch.cluster.coordination.FollowersChecker.FOLLOWER_CHECK_TIMEOUT_SETTING;
import static org.opensearch.cluster.coordination.LagDetector.CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING;
import static org.opensearch.cluster.coordination.LeaderChecker.LEADER_CHECK_TIMEOUT_SETTING;
import static org.opensearch.common.unit.TimeValue.timeValueSeconds;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -42,10 +45,10 @@ public void testFollowerCheckTimeoutValueUpdate() {

public void testFollowerCheckTimeoutMaxValue() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "61s").build();
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "151s").build();

assertThrows(
"failed to parse value [61s] for setting [" + setting1.getKey() + "], must be <= [60000ms]",
"failed to parse value [151s] for setting [" + setting1.getKey() + "], must be <= [150000ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
Expand All @@ -66,6 +69,38 @@ public void testFollowerCheckTimeoutMinValue() {
);
}

public void testFollowerCheckIntervalValueUpdate() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_INTERVAL_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "10s").build();
try {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(timeSettings1)
.execute()
.actionGet();
assertAcked(response);
assertEquals(timeValueSeconds(10), setting1.get(response.getPersistentSettings()));
} finally {
// cleanup
timeSettings1 = Settings.builder().putNull(setting1.getKey()).build();
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
}

public void testFollowerCheckIntervalMinValue() {
Setting<TimeValue> setting1 = FOLLOWER_CHECK_INTERVAL_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "10ms").build();

assertThrows(
"failed to parse value [10ms] for setting [" + setting1.getKey() + "], must be >= [100ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}

public void testLeaderCheckTimeoutValueUpdate() {
Setting<TimeValue> setting1 = LEADER_CHECK_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build();
Expand Down Expand Up @@ -110,4 +145,70 @@ public void testLeaderCheckTimeoutMinValue() {
}
);
}

public void testClusterPublishTimeoutValueUpdate() {
Setting<TimeValue> setting1 = PUBLISH_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "60s").build();
try {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(timeSettings1)
.execute()
.actionGet();
assertAcked(response);
assertEquals(timeValueSeconds(60), setting1.get(response.getPersistentSettings()));
} finally {
// cleanup
timeSettings1 = Settings.builder().putNull(setting1.getKey()).build();
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
}

public void testClusterPublishTimeoutMinValue() {
Setting<TimeValue> setting1 = PUBLISH_TIMEOUT_SETTING;
Settings timeSettings1 = Settings.builder().put(setting1.getKey(), "0s").build();

assertThrows(
"failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(timeSettings1).execute().actionGet();
}
);
}

public void testLagDetectorTimeoutUpdate() {
Setting<TimeValue> setting1 = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING;
Settings lagDetectorTimeout = Settings.builder().put(setting1.getKey(), "30s").build();
try {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(lagDetectorTimeout)
.execute()
.actionGet();

assertAcked(response);
assertEquals(timeValueSeconds(30), setting1.get(response.getPersistentSettings()));
} finally {
// cleanup
lagDetectorTimeout = Settings.builder().putNull(setting1.getKey()).build();
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(lagDetectorTimeout).execute().actionGet();
}
}

public void testLagDetectorTimeoutMinValue() {
Setting<TimeValue> setting1 = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING;
Settings lagDetectorTimeout = Settings.builder().put(setting1.getKey(), "0s").build();

assertThrows(
"failed to parse value [0s] for setting [" + setting1.getKey() + "], must be >= [1ms]",
IllegalArgumentException.class,
() -> {
client().admin().cluster().prepareUpdateSettings().setPersistentSettings(lagDetectorTimeout).execute().actionGet();
}
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
package org.opensearch.cluster.coordination;

import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.OpenSearchTestCase;
Expand Down Expand Up @@ -70,8 +71,9 @@ public void setupFixture() {
} else {
followerLagTimeout = CLUSTER_FOLLOWER_LAG_TIMEOUT_SETTING.get(Settings.EMPTY);
}

lagDetector = new LagDetector(settingsBuilder.build(), deterministicTaskQueue.getThreadPool(), failedNodes::add, () -> localNode);
Settings settings = settingsBuilder.build();
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
lagDetector = new LagDetector(settings, clusterSettings, deterministicTaskQueue.getThreadPool(), failedNodes::add, () -> localNode);

localNode = CoordinationStateTests.createNode("local");
node1 = CoordinationStateTests.createNode("node1");
Expand Down
Loading
Loading