From ea1cc9d054b2d7cf0046438ecc57777e29112c2b Mon Sep 17 00:00:00 2001 From: Dhwanil Patel Date: Tue, 27 Dec 2022 18:56:15 +0530 Subject: [PATCH] Add version check during task submission for bwc for static threshold setting (#5633) * Add version check during task submission for bwc for static threshold setting Signed-off-by: Dhwanil Patel --- CHANGELOG.md | 1 + .../org/opensearch/OpenSearchException.java | 2 +- .../service/ClusterManagerTaskThrottler.java | 30 ++++++++++++- .../ClusterManagerTaskThrottlerTests.java | 42 +++++++++++++++++++ 4 files changed, 73 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 346cf1c55cbeb..9ba7bfbb27bb0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -112,6 +112,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix case sensitivity for wildcard queries ([#5462](https://github.com/opensearch-project/OpenSearch/pull/5462)) - Apply cluster manager throttling settings during bootstrap ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524)) - Update thresholds map when cluster manager throttling setting is removed ([#5524](https://github.com/opensearch-project/OpenSearch/pull/5524)) +- Fix backward compatibility for static cluster manager throttling threshold setting ([#5633](https://github.com/opensearch-project/OpenSearch/pull/5633)) ### Security [Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.4...HEAD diff --git a/server/src/main/java/org/opensearch/OpenSearchException.java b/server/src/main/java/org/opensearch/OpenSearchException.java index 78f6b50b3a039..78e5dc044cbd8 100644 --- a/server/src/main/java/org/opensearch/OpenSearchException.java +++ b/server/src/main/java/org/opensearch/OpenSearchException.java @@ -1613,7 +1613,7 @@ private enum OpenSearchExceptionHandle { ClusterManagerThrottlingException.class, ClusterManagerThrottlingException::new, 165, - Version.V_2_4_0 + Version.V_2_5_0 ), SNAPSHOT_IN_USE_DELETION_EXCEPTION( SnapshotInUseDeletionException.class, diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java index db2be505b2fbb..726963fe4b37d 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -22,6 +22,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; /** @@ -51,6 +52,11 @@ public class ClusterManagerTaskThrottler implements TaskBatcherListener { private final ConcurrentMap tasksThreshold; private final Supplier minNodeVersionSupplier; + // Once all nodes are greater than or equal 2.5.0 version, then only it will start throttling. + // During upgrade as well, it will wait for all older version nodes to leave the cluster before starting throttling. + // This is needed specifically for static setting to enable throttling. + private AtomicBoolean startThrottling = new AtomicBoolean(); + public ClusterManagerTaskThrottler( final Settings settings, final ClusterSettings clusterSettings, @@ -168,7 +174,7 @@ public void onBeginSubmit(List tasks) { int size = tasks.size(); if (clusterManagerThrottlingKey.isThrottlingEnabled()) { Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey()); - if (threshold != null && (count + size > threshold)) { + if (threshold != null && shouldThrottle(threshold, count, size)) { clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size); logger.warn( "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", @@ -185,6 +191,28 @@ public void onBeginSubmit(List tasks) { }); } + /** + * If throttling thresholds are set via static setting, it will update the threshold map. + * It may start throwing throttling exception to older nodes in cluster. + * Older version nodes will not be equipped to handle the throttling exception and + * this may result in unexpected behavior where internal tasks would start failing without any retries. + * + * For every task submission request, it will validate if nodes version is greater or equal to 2.5.0 and set the startThrottling flag. + * Once the startThrottling flag is set, it will not perform check for next set of tasks. + */ + private boolean shouldThrottle(Long threshold, Long count, int size) { + if (!startThrottling.get()) { + if (minNodeVersionSupplier.get().compareTo(Version.V_2_5_0) >= 0) { + startThrottling.compareAndSet(false, true); + logger.info("Starting cluster manager throttling as all nodes are higher than or equal to 2.5.0"); + } else { + logger.info("Skipping cluster manager throttling as at least one node < 2.5.0 is present in cluster"); + return false; + } + } + return count + size > threshold; + } + @Override public void onSubmitFailure(List tasks) { reduceTaskCount(tasks); diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java index 63ed54ddf5238..0acdbffe3dc4f 100644 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java @@ -324,6 +324,48 @@ public void testThrottlingForDisabledThrottlingTask() { assertEquals(0L, throttlingStats.getThrottlingCount(taskKey)); } + public void testThrottlingForInitialStaticSettingAndVersionCheck() { + ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); + DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_5_0); + DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); + setState( + clusterService, + ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) + ); + + // setting threshold in initial settings + ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + int put_mapping_threshold_value = randomIntBetween(1, 10); + Settings initialSettings = Settings.builder() + .put("cluster_manager.throttling.thresholds.put-mapping.value", put_mapping_threshold_value) + .build(); + ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( + initialSettings, + clusterSettings, + () -> { return clusterService.getMasterService().getMinNodeVersion(); }, + throttlingStats + ); + ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask("put-mapping", true); + + // verifying adding more tasks then threshold passes + throttler.onBeginSubmit(getMockUpdateTaskList("put-mapping", throttlingKey, put_mapping_threshold_value + 5)); + assertEquals(0L, throttlingStats.getThrottlingCount("put-mapping")); + + // Removing older version node from cluster + setState( + clusterService, + ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode }) + ); + + // adding more tasks, these tasks should be throttled + // As queue already have more tasks than threshold from previous call. + assertThrows( + ClusterManagerThrottlingException.class, + () -> throttler.onBeginSubmit(getMockUpdateTaskList("put-mapping", throttlingKey, 3)) + ); + assertEquals(3L, throttlingStats.getThrottlingCount("put-mapping")); + } + public void testThrottling() { ClusterManagerThrottlingStats throttlingStats = new ClusterManagerThrottlingStats(); String taskKey = "test";