From 94c5bd7a9ab1b084f057e360eda61a36dccacf90 Mon Sep 17 00:00:00 2001 From: Bukhtawar Khan Date: Fri, 11 Nov 2022 00:19:23 +0530 Subject: [PATCH] [Backport 2.4]Revert "Cluster manager task throttling feature [Final PR] (#5071) (#5203) * Revert "Cluster manager task throttling feature [Final PR] (#4986) (#5071)" This reverts commit e0d1c2e34ed39c364ce2cb5243330e30cde2b05f. Signed-off-by: Bukhtawar Khan * Fix up change log Signed-off-by: Bukhtawar Khan * Fix up release notes Signed-off-by: Bukhtawar Khan * Cleaning up the deleted file Signed-off-by: Bukhtawar Khan Signed-off-by: Bukhtawar Khan --- CHANGELOG.md | 1 - .../opensearch.release-notes-2.4.0.md | 1 - .../ClusterManagerTaskThrottlingIT.java | 214 ---------- .../org/opensearch/OpenSearchException.java | 7 - .../TransportClusterRerouteAction.java | 10 - .../TransportClusterUpdateSettingsAction.java | 13 - .../TransportDeleteStoredScriptAction.java | 7 +- .../TransportPutStoredScriptAction.java | 7 +- .../indices/create/AutoCreateAction.java | 11 - .../TransportDeleteDanglingIndexAction.java | 10 - .../datastream/DeleteDataStreamAction.java | 10 - .../rollover/TransportRolloverAction.java | 10 - .../opensearch/action/bulk/BackoffPolicy.java | 126 ------ .../action/support/RetryableAction.java | 41 +- .../TransportClusterManagerNodeAction.java | 55 +-- .../cluster/ClusterStateTaskExecutor.java | 11 - .../MetadataCreateDataStreamService.java | 10 - .../metadata/MetadataCreateIndexService.java | 11 - .../metadata/MetadataDeleteIndexService.java | 12 - .../metadata/MetadataIndexAliasesService.java | 12 - .../MetadataIndexTemplateService.java | 52 --- .../metadata/MetadataMappingService.java | 12 - .../MetadataUpdateSettingsService.java | 11 - .../service/ClusterManagerTaskKeys.java | 49 --- .../service/ClusterManagerTaskThrottler.java | 200 ---------- .../ClusterManagerTaskThrottlerListener.java | 16 - .../ClusterManagerThrottlingException.java | 28 -- .../ClusterManagerThrottlingStats.java | 42 -- .../cluster/service/ClusterService.java | 11 - .../cluster/service/MasterService.java | 36 +- .../cluster/service/TaskBatcher.java | 72 ++-- .../cluster/service/TaskBatcherListener.java | 41 -- .../common/settings/ClusterSettings.java | 2 - .../org/opensearch/ingest/IngestService.java | 19 +- .../PersistentTasksClusterService.java | 32 -- .../repositories/RepositoriesService.java | 17 - .../blobstore/BlobStoreRepository.java | 6 - .../org/opensearch/script/ScriptService.java | 13 - .../opensearch/snapshots/RestoreService.java | 13 - .../snapshots/SnapshotsService.java | 36 +- .../ExceptionSerializationTests.java | 2 - .../action/bulk/BackoffPolicyTests.java | 41 -- ...ransportClusterManagerNodeActionTests.java | 102 ----- .../MetadataDeleteIndexServiceTests.java | 3 +- .../MetadataIndexAliasesServiceTests.java | 3 +- .../MetadataIndexTemplateServiceTests.java | 29 +- .../ClusterManagerTaskThrottlerTests.java | 366 ------------------ .../cluster/service/MasterServiceTests.java | 267 ------------- .../cluster/service/TaskBatcherTests.java | 28 +- .../PersistentTasksClusterServiceTests.java | 17 +- 50 files changed, 92 insertions(+), 2053 deletions(-) delete mode 100644 server/src/internalClusterTest/java/org/opensearch/clustermanager/ClusterManagerTaskThrottlingIT.java delete mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java delete mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java delete mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerListener.java delete mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java delete mode 100644 server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingStats.java delete mode 100644 server/src/main/java/org/opensearch/cluster/service/TaskBatcherListener.java delete mode 100644 server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e9e2cd624d6eb..19b3a5e2cdee1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,6 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Introduce point in time search feature ([#3959](https://github.com/opensearch-project/OpenSearch/issues/3959)) - Introduce experimental searchable snapshot feature ([#2919](https://github.com/opensearch-project/OpenSearch/issues/2919)) - Add API for decommissioning/recommissioning zone and weighted zonal search request routing policy ([#3639](https://github.com/opensearch-project/OpenSearch/issues/3639)) -- Introduce cluster manager task throttling framework [#479](https://github.com/opensearch-project/OpenSearch/issues/479) - Add support for s390x architecture ([#4001](https://github.com/opensearch-project/OpenSearch/pull/4001)) - Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085)) - Add failover support with Segment Replication enabled. ([#4325](https://github.com/opensearch-project/OpenSearch/pull/4325) diff --git a/release-notes/opensearch.release-notes-2.4.0.md b/release-notes/opensearch.release-notes-2.4.0.md index 2a377aa8fce0b..f6a1378a55ee1 100644 --- a/release-notes/opensearch.release-notes-2.4.0.md +++ b/release-notes/opensearch.release-notes-2.4.0.md @@ -5,7 +5,6 @@ - Introduce point in time search feature ([#3959](https://github.com/opensearch-project/OpenSearch/issues/3959)) - Introduce experimental searchable snapshot feature ([#2919](https://github.com/opensearch-project/OpenSearch/issues/2919)) - Add API for decommissioning/recommissioning zone and weighted zonal search request routing policy ([#3639](https://github.com/opensearch-project/OpenSearch/issues/3639)) -- Introduce cluster manager task throttling framework [#479](https://github.com/opensearch-project/OpenSearch/issues/479) - Add support for s390x architecture ([#4001](https://github.com/opensearch-project/OpenSearch/pull/4001)) - Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085)) - Add failover support with Segment Replication enabled. ([#4325](https://github.com/opensearch-project/OpenSearch/pull/4325) diff --git a/server/src/internalClusterTest/java/org/opensearch/clustermanager/ClusterManagerTaskThrottlingIT.java b/server/src/internalClusterTest/java/org/opensearch/clustermanager/ClusterManagerTaskThrottlingIT.java deleted file mode 100644 index 9817861c88e9a..0000000000000 --- a/server/src/internalClusterTest/java/org/opensearch/clustermanager/ClusterManagerTaskThrottlingIT.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.clustermanager; - -import org.opensearch.action.ActionListener; -import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; -import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; -import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; -import org.opensearch.cluster.service.ClusterManagerThrottlingException; -import org.opensearch.common.settings.Settings; -import org.opensearch.test.OpenSearchIntegTestCase; -import org.opensearch.transport.TransportService; -import org.opensearch.transport.TransportMessageListener; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; - -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; - -@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 0) -public class ClusterManagerTaskThrottlingIT extends OpenSearchIntegTestCase { - - /* - * This integ test will test end-end cluster manager throttling feature for - * remote cluster manager. - * - * It will check the number of request coming to cluster manager node - * should be total number of requests + throttled requests from cluster manager. - * This will ensure the end-end feature is working as cluster manager is throwing - * Throttling exception and data node is performing retries on it. - * - */ - public void testThrottlingForRemoteClusterManager() throws Exception { - try { - internalCluster().beforeTest(random()); - String clusterManagerNode = internalCluster().startClusterManagerOnlyNode(); - String dataNode = internalCluster().startDataOnlyNode(); - int throttlingLimit = randomIntBetween(1, 5); - createIndex("test"); - setPutMappingThrottlingLimit(throttlingLimit); - - TransportService clusterManagerTransportService = (internalCluster().getInstance(TransportService.class, clusterManagerNode)); - AtomicInteger requestCountOnClusterManager = new AtomicInteger(); - AtomicInteger throttledRequest = new AtomicInteger(); - int totalRequest = randomIntBetween(throttlingLimit, 5 * throttlingLimit); - CountDownLatch latch = new CountDownLatch(totalRequest); - - clusterManagerTransportService.addMessageListener(new TransportMessageListener() { - @Override - public void onRequestReceived(long requestId, String action) { - if (action.contains("mapping")) { - requestCountOnClusterManager.incrementAndGet(); - } - } - - @Override - public void onResponseSent(long requestId, String action, Exception error) { - if (action.contains("mapping")) { - throttledRequest.incrementAndGet(); - assertEquals(ClusterManagerThrottlingException.class, error.getClass()); - } - } - }); - - ActionListener listener = new ActionListener() { - @Override - public void onResponse(Object o) { - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - latch.countDown(); - throw new AssertionError(e); - } - }; - - executePutMappingRequests(totalRequest, dataNode, listener); - latch.await(); - - assertEquals(totalRequest + throttledRequest.get(), requestCountOnClusterManager.get()); - assertBusy( - () -> { assertEquals(clusterService().getMasterService().numberOfThrottledPendingTasks(), throttledRequest.get()); } - ); - } finally { - clusterSettingCleanUp(); - } - } - - /* - * This will test the throttling feature for single node. - * - * Here we will assert the client behaviour that client's request is not - * failed, i.e. Throttling exception is not passed to the client. - * Data node will internally do the retry and request should pass. - * - */ - public void testThrottlingForSingleNode() throws Exception { - try { - internalCluster().beforeTest(random()); - String node = internalCluster().startNode(); - int throttlingLimit = randomIntBetween(1, 5); - createIndex("test"); - setPutMappingThrottlingLimit(throttlingLimit); - - AtomicInteger successfulRequest = new AtomicInteger(); - int totalRequest = randomIntBetween(throttlingLimit, 3 * throttlingLimit); - CountDownLatch latch = new CountDownLatch(totalRequest); - - ActionListener listener = new ActionListener() { - @Override - public void onResponse(Object o) { - successfulRequest.incrementAndGet(); - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - latch.countDown(); - throw new AssertionError(e); - } - }; - executePutMappingRequests(totalRequest, node, listener); - - latch.await(); - assertEquals(totalRequest, successfulRequest.get()); - } finally { - clusterSettingCleanUp(); - } - } - - /* - * This will test the timeout of tasks during throttling. - * - * Here we will assert the client behaviour that client's request is not - * failed with throttling exception but timeout exception. - * It also verifies that if limit is set to 0, all tasks are getting timedout. - */ - - public void testTimeoutWhileThrottling() throws Exception { - try { - internalCluster().beforeTest(random()); - String node = internalCluster().startNode(); - int throttlingLimit = 0; // throttle all the tasks - createIndex("test"); - setPutMappingThrottlingLimit(throttlingLimit); - - AtomicInteger timedoutRequest = new AtomicInteger(); - int totalRequest = randomIntBetween(1, 5); - CountDownLatch latch = new CountDownLatch(totalRequest); - - ActionListener listener = new ActionListener() { - @Override - public void onResponse(Object o) { - latch.countDown(); - throw new AssertionError("Request should not succeed"); - } - - @Override - public void onFailure(Exception e) { - timedoutRequest.incrementAndGet(); - latch.countDown(); - assertTrue(e instanceof ProcessClusterEventTimeoutException); - } - }; - executePutMappingRequests(totalRequest, node, listener); - - latch.await(); - assertEquals(totalRequest, timedoutRequest.get()); // verifying all requests were timed out with 0 throttling limit - } finally { - clusterSettingCleanUp(); - } - } - - private void executePutMappingRequests(int totalRequest, String node, ActionListener listener) throws Exception { - Thread[] threads = new Thread[totalRequest]; - for (int i = 0; i < totalRequest; i++) { - PutMappingRequest putMappingRequest = new PutMappingRequest("test").source("field" + i, "type=text"); - threads[i] = new Thread(new Runnable() { - @Override - public void run() { - internalCluster().client(node).admin().indices().putMapping(putMappingRequest, listener); - } - }); - } - for (int i = 0; i < totalRequest; i++) { - threads[i].run(); - } - for (int i = 0; i < totalRequest; i++) { - threads[i].join(); - } - } - - private void setPutMappingThrottlingLimit(int throttlingLimit) { - ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); - Settings settings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", throttlingLimit).build(); - settingsRequest.transientSettings(settings); - assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet()); - } - - private void clusterSettingCleanUp() { - // We need to remove the throttling limit from setting as part of test cleanup - ClusterUpdateSettingsRequest settingsRequest = new ClusterUpdateSettingsRequest(); - Settings settings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", (String) null).build(); - settingsRequest.transientSettings(settings); - assertAcked(client().admin().cluster().updateSettings(settingsRequest).actionGet()); - } -} diff --git a/server/src/main/java/org/opensearch/OpenSearchException.java b/server/src/main/java/org/opensearch/OpenSearchException.java index a8107a80ea8b7..28301c3897630 100644 --- a/server/src/main/java/org/opensearch/OpenSearchException.java +++ b/server/src/main/java/org/opensearch/OpenSearchException.java @@ -34,7 +34,6 @@ import org.opensearch.action.support.replication.ReplicationOperation; import org.opensearch.cluster.action.shard.ShardStateAction; -import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.common.CheckedFunction; import org.opensearch.common.Nullable; import org.opensearch.common.ParseField; @@ -1620,12 +1619,6 @@ private enum OpenSearchExceptionHandle { org.opensearch.cluster.decommission.NodeDecommissionedException::new, 164, V_2_4_0 - ), - CLUSTER_MANAGER_TASK_THROTTLED_EXCEPTION( - ClusterManagerThrottlingException.class, - ClusterManagerThrottlingException::new, - 165, - Version.V_2_4_0 ); final Class exceptionClass; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index e9ae23f6b9e34..3e5ebdd6a17d3 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -54,8 +54,6 @@ import org.opensearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand; import org.opensearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; import org.opensearch.cluster.routing.allocation.command.AllocationCommand; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.Strings; @@ -82,7 +80,6 @@ public class TransportClusterRerouteAction extends TransportClusterManagerNodeAc private static final Logger logger = LogManager.getLogger(TransportClusterRerouteAction.class); private final AllocationService allocationService; - private static ClusterManagerTaskThrottler.ThrottlingKey clusterRerouteTaskKey; @Inject public TransportClusterRerouteAction( @@ -103,8 +100,6 @@ public TransportClusterRerouteAction( indexNameExpressionResolver ); this.allocationService = allocationService; - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - clusterRerouteTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CLUSTER_REROUTE_API_KEY, true); } @Override @@ -246,11 +241,6 @@ static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClus this.allocationService = allocationService; } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return clusterRerouteTaskKey; - } - @Override protected ClusterRerouteResponse newResponse(boolean acknowledged) { return new ClusterRerouteResponse(acknowledged, clusterStateToSend, explanations); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index e9cb6a78f5269..ef404375485a2 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -47,8 +47,6 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -75,8 +73,6 @@ public class TransportClusterUpdateSettingsAction extends TransportClusterManage private final ClusterSettings clusterSettings; - private final ClusterManagerTaskThrottler.ThrottlingKey clusterUpdateSettingTaskKey; - @Inject public TransportClusterUpdateSettingsAction( TransportService transportService, @@ -99,10 +95,6 @@ public TransportClusterUpdateSettingsAction( ); this.allocationService = allocationService; this.clusterSettings = clusterSettings; - - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - clusterUpdateSettingTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CLUSTER_UPDATE_SETTINGS_KEY, true); - } @Override @@ -144,11 +136,6 @@ protected void clusterManagerOperation( private volatile boolean changed = false; - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return clusterUpdateSettingTaskKey; - } - @Override protected ClusterUpdateSettingsResponse newResponse(boolean acknowledged) { return new ClusterUpdateSettingsResponse(acknowledged, updater.getTransientUpdates(), updater.getPersistentUpdate()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java index e41ec2b1f737c..4bc8d836a8200 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportDeleteStoredScriptAction.java @@ -40,8 +40,6 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; @@ -59,7 +57,6 @@ public class TransportDeleteStoredScriptAction extends TransportClusterManagerNodeAction { private final ScriptService scriptService; - private final ClusterManagerTaskThrottler.ThrottlingKey deleteScriptTaskKey; @Inject public TransportDeleteStoredScriptAction( @@ -80,8 +77,6 @@ public TransportDeleteStoredScriptAction( indexNameExpressionResolver ); this.scriptService = scriptService; - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - deleteScriptTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SCRIPT_KEY, true); } @Override @@ -100,7 +95,7 @@ protected void clusterManagerOperation( ClusterState state, ActionListener listener ) throws Exception { - scriptService.deleteStoredScript(clusterService, request, deleteScriptTaskKey, listener); + scriptService.deleteStoredScript(clusterService, request, listener); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java index 8ffe4d2b74695..bb259f173d470 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/storedscripts/TransportPutStoredScriptAction.java @@ -40,8 +40,6 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; @@ -59,7 +57,6 @@ public class TransportPutStoredScriptAction extends TransportClusterManagerNodeAction { private final ScriptService scriptService; - private final ClusterManagerTaskThrottler.ThrottlingKey putScriptTaskKey; @Inject public TransportPutStoredScriptAction( @@ -80,8 +77,6 @@ public TransportPutStoredScriptAction( indexNameExpressionResolver ); this.scriptService = scriptService; - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - putScriptTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_SCRIPT_KEY, true); } @Override @@ -100,7 +95,7 @@ protected void clusterManagerOperation( ClusterState state, ActionListener listener ) throws Exception { - scriptService.putStoredScript(clusterService, request, putScriptTaskKey, listener); + scriptService.putStoredScript(clusterService, request, listener); } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java b/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java index 77f09f02c9a9c..73a2996945aff 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/create/AutoCreateAction.java @@ -50,8 +50,6 @@ import org.opensearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest; import org.opensearch.cluster.metadata.MetadataCreateIndexService; import org.opensearch.cluster.metadata.MetadataIndexTemplateService; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.inject.Inject; @@ -86,7 +84,6 @@ public static final class TransportAction extends TransportClusterManagerNodeAct private final ActiveShardsObserver activeShardsObserver; private final MetadataCreateIndexService createIndexService; private final MetadataCreateDataStreamService metadataCreateDataStreamService; - private final ClusterManagerTaskThrottler.ThrottlingKey autoCreateTaskKey; @Inject public TransportAction( @@ -102,9 +99,6 @@ public TransportAction( this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); this.createIndexService = createIndexService; this.metadataCreateDataStreamService = metadataCreateDataStreamService; - - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - autoCreateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.AUTO_CREATE_KEY, true); } @Override @@ -148,11 +142,6 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return autoCreateTaskKey; - } - @Override public ClusterState execute(ClusterState currentState) throws Exception { DataStreamTemplate dataStreamTemplate = resolveAutoCreateDataStream(request, currentState.metadata()); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java b/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java index e14125c21af9c..015a0f6727ab7 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/dangling/delete/TransportDeleteDanglingIndexAction.java @@ -54,8 +54,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.StreamInput; @@ -82,7 +80,6 @@ public class TransportDeleteDanglingIndexAction extends TransportClusterManagerN private final Settings settings; private final NodeClient nodeClient; - private final ClusterManagerTaskThrottler.ThrottlingKey deleteDanglingIndexTaskKey; @Inject public TransportDeleteDanglingIndexAction( @@ -105,8 +102,6 @@ public TransportDeleteDanglingIndexAction( ); this.settings = settings; this.nodeClient = nodeClient; - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - deleteDanglingIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_DANGLING_INDEX_KEY, true); } @Override @@ -162,11 +157,6 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { return new AcknowledgedResponse(acknowledged); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return deleteDanglingIndexTaskKey; - } - @Override public ClusterState execute(final ClusterState currentState) { return deleteDanglingIndex(currentState, indexToDelete); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java index 9260904025df2..74b0a84782283 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -50,8 +50,6 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.MetadataDeleteIndexService; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.Strings; @@ -169,7 +167,6 @@ public IndicesRequest indices(String... indices) { public static class TransportAction extends TransportClusterManagerNodeAction { private final MetadataDeleteIndexService deleteIndexService; - private final ClusterManagerTaskThrottler.ThrottlingKey removeDataStreamTaskKey; @Inject public TransportAction( @@ -182,8 +179,6 @@ public TransportAction( ) { super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); this.deleteIndexService = deleteIndexService; - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - removeDataStreamTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_DATA_STREAM_KEY, true); } @Override @@ -213,11 +208,6 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return removeDataStreamTaskKey; - } - @Override public ClusterState execute(ClusterState currentState) { return removeDataStream(deleteIndexService, currentState, request); diff --git a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java index b4ecae2ee08ba..4e5e7ec9184fe 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -49,8 +49,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.inject.Inject; @@ -79,7 +77,6 @@ public class TransportRolloverAction extends TransportClusterManagerNodeAction iterator() { - return new ExponentialEqualJitterBackoffIterator(baseDelay, maxDelayForRetry); - } - } - - private static class ExponentialEqualJitterBackoffIterator implements Iterator { - /** - * Retry limit to avoids integer overflow issues. - * Post this limit, max delay will be returned with Equal Jitter. - * - * NOTE: If the value is greater than 30, there can be integer overflow - * issues during delay calculation. - **/ - private final int RETRIES_TILL_JITTER_INCREASE = 30; - - /** - * Exponential increase in delay will happen till it reaches maxDelayForRetry. - * Once delay has exceeded maxDelayForRetry, it will return maxDelayForRetry only - * and not increase the delay. - */ - private final int maxDelayForRetry; - private final int baseDelay; - private int retriesAttempted; - - private ExponentialEqualJitterBackoffIterator(int baseDelay, int maxDelayForRetry) { - this.baseDelay = baseDelay; - this.maxDelayForRetry = maxDelayForRetry; - } - - /** - * There is not any limit for this BackOff. - * This Iterator will always return back off delay. - * - * @return true - */ - @Override - public boolean hasNext() { - return true; - } - - @Override - public TimeValue next() { - int retries = Math.min(retriesAttempted, RETRIES_TILL_JITTER_INCREASE); - int exponentialDelay = (int) Math.min((1L << retries) * baseDelay, maxDelayForRetry); - retriesAttempted++; - return TimeValue.timeValueMillis((exponentialDelay / 2) + Randomness.get().nextInt(exponentialDelay / 2 + 1)); - } - } - - private static class ExponentialFullJitterBackoff extends BackoffPolicy { - private final long baseDelay; - - private ExponentialFullJitterBackoff(long baseDelay) { - this.baseDelay = baseDelay; - } - - @Override - public Iterator iterator() { - return new ExponentialFullJitterBackoffIterator(baseDelay); - } - } - - private static class ExponentialFullJitterBackoffIterator implements Iterator { - /** - * Current delay in exponential backoff - */ - private long currentDelay; - - private ExponentialFullJitterBackoffIterator(long baseDelay) { - this.currentDelay = baseDelay; - } - - /** - * There is not any limit for this BackOff. - * This Iterator will always return back off delay. - * - * @return true - */ - @Override - public boolean hasNext() { - return true; - } - - @Override - public TimeValue next() { - TimeValue delayToReturn = TimeValue.timeValueMillis(Randomness.get().nextInt(Math.toIntExact(currentDelay)) + 1); - currentDelay = Math.min(2 * currentDelay, Integer.MAX_VALUE); - return delayToReturn; - } - } - /** * Concrete Constant Back Off Policy * diff --git a/server/src/main/java/org/opensearch/action/support/RetryableAction.java b/server/src/main/java/org/opensearch/action/support/RetryableAction.java index 281cf728fb18c..38b7e6ec2a8a0 100644 --- a/server/src/main/java/org/opensearch/action/support/RetryableAction.java +++ b/server/src/main/java/org/opensearch/action/support/RetryableAction.java @@ -36,14 +36,13 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; import org.opensearch.action.ActionRunnable; -import org.opensearch.action.bulk.BackoffPolicy; +import org.opensearch.common.Randomness; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.OpenSearchRejectedExecutionException; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayDeque; -import java.util.Iterator; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -65,7 +64,6 @@ public abstract class RetryableAction { private final long startMillis; private final ActionListener finalListener; private final String executor; - private final BackoffPolicy backoffPolicy; private volatile Scheduler.ScheduledCancellable retryTask; @@ -76,15 +74,7 @@ public RetryableAction( TimeValue timeoutValue, ActionListener listener ) { - this( - logger, - threadPool, - initialDelay, - timeoutValue, - listener, - BackoffPolicy.exponentialFullJitterBackoff(initialDelay.getMillis()), - ThreadPool.Names.SAME - ); + this(logger, threadPool, initialDelay, timeoutValue, listener, ThreadPool.Names.SAME); } public RetryableAction( @@ -93,7 +83,6 @@ public RetryableAction( TimeValue initialDelay, TimeValue timeoutValue, ActionListener listener, - BackoffPolicy backoffPolicy, String executor ) { this.logger = logger; @@ -106,11 +95,10 @@ public RetryableAction( this.startMillis = threadPool.relativeTimeInMillis(); this.finalListener = listener; this.executor = executor; - this.backoffPolicy = backoffPolicy; } public void run() { - final RetryingListener retryingListener = new RetryingListener(backoffPolicy.iterator(), null); + final RetryingListener retryingListener = new RetryingListener(initialDelayMillis, null); final Runnable runnable = createRunnable(retryingListener); threadPool.executor(executor).execute(runnable); } @@ -154,24 +142,16 @@ public void onRejection(Exception e) { public void onFinished() {} - /** - * Retry able task may want to throw different Exception on timeout, - * they can override it method for that. - */ - public Exception getTimeoutException(Exception e) { - return e; - } - private class RetryingListener implements ActionListener { private static final int MAX_EXCEPTIONS = 4; + private final long delayMillisBound; private ArrayDeque caughtExceptions; - private Iterator backoffDelayIterator; - private RetryingListener(Iterator backoffDelayIterator, ArrayDeque caughtExceptions) { + private RetryingListener(long delayMillisBound, ArrayDeque caughtExceptions) { + this.delayMillisBound = delayMillisBound; this.caughtExceptions = caughtExceptions; - this.backoffDelayIterator = backoffDelayIterator; } @Override @@ -191,13 +171,16 @@ public void onFailure(Exception e) { () -> new ParameterizedMessage("retryable action timed out after {}", TimeValue.timeValueMillis(elapsedMillis)), e ); - onFinalFailure(getTimeoutException(e)); + onFinalFailure(e); } else { addException(e); - final TimeValue delay = backoffDelayIterator.next(); - final Runnable runnable = createRunnable(this); + final long nextDelayMillisBound = Math.min(delayMillisBound * 2, Integer.MAX_VALUE); + final RetryingListener retryingListener = new RetryingListener(nextDelayMillisBound, caughtExceptions); + final Runnable runnable = createRunnable(retryingListener); + final long delayMillis = Randomness.get().nextInt(Math.toIntExact(delayMillisBound)) + 1; if (isDone.get() == false) { + final TimeValue delay = TimeValue.timeValueMillis(delayMillis); logger.debug(() -> new ParameterizedMessage("retrying action that failed in {}", delay), e); try { retryTask = threadPool.schedule(runnable, delay, executor); diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index be075e605a21d..a97f4ffe555b6 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -39,10 +39,8 @@ import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.ActionResponse; import org.opensearch.action.ActionRunnable; -import org.opensearch.action.bulk.BackoffPolicy; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.action.support.RetryableAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.ClusterManagerNodeChangePredicate; @@ -50,10 +48,8 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.coordination.FailedToCommitClusterStateException; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; -import org.opensearch.cluster.metadata.ProcessClusterEventTimeoutException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.Writeable; @@ -160,10 +156,12 @@ protected boolean localExecute(Request request) { @Override protected void doExecute(Task task, final Request request, ActionListener listener) { + ClusterState state = clusterService.state(); + logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); if (task != null) { request.setParentTask(clusterService.localNode().getId(), task.getId()); } - new AsyncSingleAction(task, request, listener).run(); + new AsyncSingleAction(task, request, listener).doStart(state); } /** @@ -171,62 +169,21 @@ protected void doExecute(Task task, final Request request, ActionListener listener; + private final ActionListener listener; private final Request request; private ClusterStateObserver observer; private final long startTime; private final Task task; - private static final int BASE_DELAY_MILLIS = 10; - private static final int MAX_DELAY_MILLIS = 5000; AsyncSingleAction(Task task, Request request, ActionListener listener) { - super( - logger, - threadPool, - TimeValue.timeValueMillis(BASE_DELAY_MILLIS), - request.clusterManagerNodeTimeout, - listener, - BackoffPolicy.exponentialEqualJitterBackoff(BASE_DELAY_MILLIS, MAX_DELAY_MILLIS), - ThreadPool.Names.SAME - ); this.task = task; this.request = request; + this.listener = listener; this.startTime = threadPool.relativeTimeInMillis(); } - @Override - public void tryAction(ActionListener retryListener) { - ClusterState state = clusterService.state(); - logger.trace("starting processing request [{}] with cluster state version [{}]", request, state.version()); - this.listener = retryListener; - doStart(state); - } - - @Override - public boolean shouldRetry(Exception e) { - // If remote address is null, i.e request is generated from same node and we would want to perform retry for it - // If remote address is not null, i.e request is generated from remote node and received on this master node on transport layer - // in that case we would want throttling retry to perform on remote node only not on this master node. - if (request.remoteAddress() == null) { - if (e instanceof TransportException) { - return ((TransportException) e).unwrapCause() instanceof ClusterManagerThrottlingException; - } - return e instanceof ClusterManagerThrottlingException; - } - return false; - } - - /** - * If tasks gets timed out in retrying on throttling, - * it should send cluster event timeout exception. - */ - @Override - public Exception getTimeoutException(Exception e) { - return new ProcessClusterEventTimeoutException(request.masterNodeTimeout, actionName); - } - protected void doStart(ClusterState clusterState) { try { final DiscoveryNodes nodes = clusterState.nodes(); diff --git a/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java b/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java index 50beeb1f03deb..976019ae77d6c 100644 --- a/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java +++ b/server/src/main/java/org/opensearch/cluster/ClusterStateTaskExecutor.java @@ -31,7 +31,6 @@ package org.opensearch.cluster; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.common.Nullable; import java.util.IdentityHashMap; @@ -89,16 +88,6 @@ default String describeTasks(List tasks) { return String.join(", ", tasks.stream().map(t -> (CharSequence) t.toString()).filter(t -> t.length() > 0)::iterator); } - /** - * Throttling key associated with the task, on which cluster manager node will do aggregation count - * and perform throttling based on configured threshold in cluster setting. - */ - default ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - // Default task is not registered with clusterService.registerClusterMangerTask, - // User can't configure throttling limit on it and will be bypassed while throttling on cluster manager - return ClusterManagerTaskThrottler.DEFAULT_THROTTLING_KEY; - } - /** * Represents the result of a batched execution of cluster state update tasks * @param the type of the cluster state update task diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java index 7be5ea7e2c34a..412d4dba628cb 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateDataStreamService.java @@ -45,8 +45,6 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ack.ClusterStateUpdateRequest; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.settings.Settings; @@ -76,7 +74,6 @@ public class MetadataCreateDataStreamService { private final ClusterService clusterService; private final ActiveShardsObserver activeShardsObserver; private final MetadataCreateIndexService metadataCreateIndexService; - private final ClusterManagerTaskThrottler.ThrottlingKey createDataStreamTaskKey; public MetadataCreateDataStreamService( ThreadPool threadPool, @@ -86,8 +83,6 @@ public MetadataCreateDataStreamService( this.clusterService = clusterService; this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); this.metadataCreateIndexService = metadataCreateIndexService; - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - createDataStreamTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_DATA_STREAM_KEY, true); } public void createDataStream(CreateDataStreamClusterStateUpdateRequest request, ActionListener finalListener) { @@ -118,11 +113,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { return clusterState; } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return createDataStreamTaskKey; - } - @Override protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index 879a7421251fb..46be91fff398b 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -60,8 +60,6 @@ import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -149,7 +147,6 @@ public class MetadataCreateIndexService { private final ShardLimitValidator shardLimitValidator; private final boolean forbidPrivateIndexSettings; private final Set indexSettingProviders = new HashSet<>(); - private final ClusterManagerTaskThrottler.ThrottlingKey createIndexTaskKey; private AwarenessReplicaBalance awarenessReplicaBalance; public MetadataCreateIndexService( @@ -180,9 +177,6 @@ public MetadataCreateIndexService( this.forbidPrivateIndexSettings = forbidPrivateIndexSettings; this.shardLimitValidator = shardLimitValidator; this.awarenessReplicaBalance = awarenessReplicaBalance; - - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true); } /** @@ -332,11 +326,6 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return createIndexTaskKey; - } - @Override public ClusterState execute(ClusterState currentState) throws Exception { return applyCreateIndexRequest(currentState, request, false); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java index 655b5ceb376f5..66f5edf3da129 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataDeleteIndexService.java @@ -43,8 +43,6 @@ import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.collect.ImmutableOpenMap; @@ -75,17 +73,12 @@ public class MetadataDeleteIndexService { private final ClusterService clusterService; private final AllocationService allocationService; - private final ClusterManagerTaskThrottler.ThrottlingKey deleteIndexTaskKey; @Inject public MetadataDeleteIndexService(Settings settings, ClusterService clusterService, AllocationService allocationService) { this.settings = settings; this.clusterService = clusterService; this.allocationService = allocationService; - - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - deleteIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_INDEX_KEY, true); - } public void deleteIndices( @@ -105,11 +98,6 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return deleteIndexTaskKey; - } - @Override public ClusterState execute(final ClusterState currentState) { return deleteIndices(currentState, Sets.newHashSet(request.indices())); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java index 7f5a5e876d373..8d6939a57240c 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexAliasesService.java @@ -39,8 +39,6 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; import org.opensearch.cluster.metadata.AliasAction.NewAliasValidator; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.Strings; @@ -80,7 +78,6 @@ public class MetadataIndexAliasesService { private final MetadataDeleteIndexService deleteIndexService; private final NamedXContentRegistry xContentRegistry; - private final ClusterManagerTaskThrottler.ThrottlingKey indexAliasTaskKey; @Inject public MetadataIndexAliasesService( @@ -95,10 +92,6 @@ public MetadataIndexAliasesService( this.aliasValidator = aliasValidator; this.deleteIndexService = deleteIndexService; this.xContentRegistry = xContentRegistry; - - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - indexAliasTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.INDEX_ALIASES_KEY, true); - } public void indicesAliases( @@ -113,11 +106,6 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return indexAliasTaskKey; - } - @Override public ClusterState execute(ClusterState currentState) { return applyAliasActions(currentState, request.actions()); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java index c2160b37f2722..7e91b491a234c 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataIndexTemplateService.java @@ -45,8 +45,6 @@ import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -111,12 +109,6 @@ public class MetadataIndexTemplateService { private final MetadataCreateIndexService metadataCreateIndexService; private final IndexScopedSettings indexScopedSettings; private final NamedXContentRegistry xContentRegistry; - private final ClusterManagerTaskThrottler.ThrottlingKey createIndexTemplateTaskKey; - private final ClusterManagerTaskThrottler.ThrottlingKey createIndexTemplateV2TaskKey; - private final ClusterManagerTaskThrottler.ThrottlingKey removeIndexTemplateTaskKey; - private final ClusterManagerTaskThrottler.ThrottlingKey removeIndexTemplateV2TaskKey; - private final ClusterManagerTaskThrottler.ThrottlingKey createComponentTemplateTaskKey; - private final ClusterManagerTaskThrottler.ThrottlingKey removeComponentTemplateTaskKey; @Inject public MetadataIndexTemplateService( @@ -133,20 +125,6 @@ public MetadataIndexTemplateService( this.metadataCreateIndexService = metadataCreateIndexService; this.indexScopedSettings = indexScopedSettings; this.xContentRegistry = xContentRegistry; - - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - createIndexTemplateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_TEMPLATE_KEY, true); - createIndexTemplateV2TaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_TEMPLATE_V2_KEY, true); - removeIndexTemplateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_INDEX_TEMPLATE_KEY, true); - removeIndexTemplateV2TaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_INDEX_TEMPLATE_V2_KEY, true); - createComponentTemplateTaskKey = clusterService.registerClusterManagerTask( - ClusterManagerTaskKeys.CREATE_COMPONENT_TEMPLATE_KEY, - true - ); - removeComponentTemplateTaskKey = clusterService.registerClusterManagerTask( - ClusterManagerTaskKeys.REMOVE_COMPONENT_TEMPLATE_KEY, - true - ); } public void removeTemplates(final RemoveRequest request, final RemoveListener listener) { @@ -162,11 +140,6 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return removeIndexTemplateTaskKey; - } - @Override public ClusterState execute(ClusterState currentState) { Set templateNames = new HashSet<>(); @@ -225,11 +198,6 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return createComponentTemplateTaskKey; - } - @Override public ClusterState execute(ClusterState currentState) throws Exception { return addComponentTemplate(currentState, create, name, template); @@ -390,11 +358,6 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return removeComponentTemplateTaskKey; - } - @Override public ClusterState execute(ClusterState currentState) { Set templateNames = new HashSet<>(); @@ -484,11 +447,6 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return createIndexTemplateV2TaskKey; - } - @Override public ClusterState execute(ClusterState currentState) throws Exception { return addIndexTemplateV2(currentState, create, name, template); @@ -806,11 +764,6 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return removeIndexTemplateV2TaskKey; - } - @Override public ClusterState execute(ClusterState currentState) { return innerRemoveIndexTemplateV2(currentState, name); @@ -915,11 +868,6 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return createIndexTemplateTaskKey; - } - @Override public ClusterState execute(ClusterState currentState) throws Exception { validateTemplate(request.settings, request.mappings, indicesService, xContentRegistry); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java index deb4dfd2581bf..7f67c45fc80e5 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataMappingService.java @@ -43,8 +43,6 @@ import org.opensearch.cluster.ClusterStateTaskExecutor; import org.opensearch.cluster.ack.ClusterStateUpdateResponse; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -80,7 +78,6 @@ public class MetadataMappingService { private final ClusterService clusterService; private final IndicesService indicesService; - private final ClusterManagerTaskThrottler.ThrottlingKey putMappingTaskKey; final RefreshTaskExecutor refreshExecutor = new RefreshTaskExecutor(); final PutMappingExecutor putMappingExecutor = new PutMappingExecutor(); @@ -89,10 +86,6 @@ public class MetadataMappingService { public MetadataMappingService(ClusterService clusterService, IndicesService indicesService) { this.clusterService = clusterService; this.indicesService = indicesService; - - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - putMappingTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_MAPPING_KEY, true); - } static class RefreshTask { @@ -253,11 +246,6 @@ public ClusterTasksResult execute( } } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return putMappingTaskKey; - } - private ClusterState applyRequest( ClusterState currentState, PutMappingClusterStateUpdateRequest request, diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java index 2b5e236bef8c9..4756e625d21cc 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataUpdateSettingsService.java @@ -47,8 +47,6 @@ import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.AwarenessReplicaBalance; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.ValidationException; @@ -91,7 +89,6 @@ public class MetadataUpdateSettingsService { private final IndicesService indicesService; private final ShardLimitValidator shardLimitValidator; private final ThreadPool threadPool; - private final ClusterManagerTaskThrottler.ThrottlingKey updateSettingsTaskKey; private AwarenessReplicaBalance awarenessReplicaBalance; @@ -112,9 +109,6 @@ public MetadataUpdateSettingsService( this.indicesService = indicesService; this.shardLimitValidator = shardLimitValidator; this.awarenessReplicaBalance = awarenessReplicaBalance; - - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - updateSettingsTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.UPDATE_SETTINGS_KEY, true); } public void updateSettings( @@ -168,11 +162,6 @@ protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { return new ClusterStateUpdateResponse(acknowledged); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return updateSettingsTaskKey; - } - @Override public ClusterState execute(ClusterState currentState) { diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java deleted file mode 100644 index 0743997c23c9a..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskKeys.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.service; - -/** - * Class for maintaining all cluster manager task key at one place. - */ -public final class ClusterManagerTaskKeys { - - public static final String CREATE_INDEX_KEY = "create-index"; - public static final String UPDATE_SETTINGS_KEY = "update-settings"; - public static final String CLUSTER_UPDATE_SETTINGS_KEY = "cluster-update-settings"; - public static final String AUTO_CREATE_KEY = "auto-create"; - public static final String DELETE_INDEX_KEY = "delete-index"; - public static final String DELETE_DANGLING_INDEX_KEY = "delete-dangling-index"; - public static final String CREATE_DATA_STREAM_KEY = "create-data-stream"; - public static final String REMOVE_DATA_STREAM_KEY = "remove-data-stream"; - public static final String ROLLOVER_INDEX_KEY = "rollover-index"; - public static final String INDEX_ALIASES_KEY = "index-aliases"; - public static final String PUT_MAPPING_KEY = "put-mapping"; - public static final String CREATE_INDEX_TEMPLATE_KEY = "create-index-template"; - public static final String REMOVE_INDEX_TEMPLATE_KEY = "remove-index-template"; - public static final String CREATE_COMPONENT_TEMPLATE_KEY = "create-component-template"; - public static final String REMOVE_COMPONENT_TEMPLATE_KEY = "remove-component-template"; - public static final String CREATE_INDEX_TEMPLATE_V2_KEY = "create-index-template-v2"; - public static final String REMOVE_INDEX_TEMPLATE_V2_KEY = "remove-index-template-v2"; - public static final String PUT_PIPELINE_KEY = "put-pipeline"; - public static final String DELETE_PIPELINE_KEY = "delete-pipeline"; - public static final String CREATE_PERSISTENT_TASK_KEY = "create-persistent-task"; - public static final String FINISH_PERSISTENT_TASK_KEY = "finish-persistent-task"; - public static final String REMOVE_PERSISTENT_TASK_KEY = "remove-persistent-task"; - public static final String UPDATE_TASK_STATE_KEY = "update-task-state"; - public static final String PUT_SCRIPT_KEY = "put-script"; - public static final String DELETE_SCRIPT_KEY = "delete-script"; - public static final String PUT_REPOSITORY_KEY = "put-repository"; - public static final String DELETE_REPOSITORY_KEY = "delete-repository"; - public static final String CREATE_SNAPSHOT_KEY = "create-snapshot"; - public static final String DELETE_SNAPSHOT_KEY = "delete-snapshot"; - public static final String UPDATE_SNAPSHOT_STATE_KEY = "update-snapshot-state"; - public static final String RESTORE_SNAPSHOT_KEY = "restore-snapshot"; - public static final String CLUSTER_REROUTE_API_KEY = "cluster-reroute-api"; - -} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java deleted file mode 100644 index 0503db713258d..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.service; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.Version; -import org.opensearch.cluster.ClusterStateTaskExecutor; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; - -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.function.Supplier; - -/** - * This class does throttling on task submission to cluster manager node, it uses throttling key defined in various executors - * as key for throttling. Throttling will be performed over task executor's class level, different task types have different executors class. - * - * Set specific setting to for setting the threshold of throttling of particular task type. - * e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks, - * Set it to default value(-1) to disable the throttling for this task type. - */ -public class ClusterManagerTaskThrottler implements TaskBatcherListener { - private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class); - public static final ThrottlingKey DEFAULT_THROTTLING_KEY = new ThrottlingKey("default-task-key", false); - - public static final Setting THRESHOLD_SETTINGS = Setting.groupSetting( - "cluster_manager.throttling.thresholds.", - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - protected Map THROTTLING_TASK_KEYS = new ConcurrentHashMap<>(); - - private final int MIN_THRESHOLD_VALUE = -1; // Disabled throttling - private final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener; - - private final ConcurrentMap tasksCount; - private final ConcurrentMap tasksThreshold; - private final Supplier minNodeVersionSupplier; - - public ClusterManagerTaskThrottler( - final ClusterSettings clusterSettings, - final Supplier minNodeVersionSupplier, - final ClusterManagerTaskThrottlerListener clusterManagerTaskThrottlerListener - ) { - clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTINGS, this::updateSetting, this::validateSetting); - this.minNodeVersionSupplier = minNodeVersionSupplier; - this.clusterManagerTaskThrottlerListener = clusterManagerTaskThrottlerListener; - tasksCount = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment - tasksThreshold = new ConcurrentHashMap<>(128); // setting initial capacity so each task will land in different segment - } - - /** - * To configure a new task for throttling, - * * Register task to cluster service with task key, - * * override getClusterManagerThrottlingKey method with above task key in task executor. - * * Verify that throttled tasks would be retried from data nodes - * - * Added retry mechanism in TransportClusterManagerNodeAction, so it would be retried for customer generated tasks. - * - * If tasks are not getting retried then we can register with false flag, so user won't be able to configure threshold limits for it. - */ - protected ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) { - ThrottlingKey throttlingKey = new ThrottlingKey(taskKey, throttlingEnabled); - if (THROTTLING_TASK_KEYS.containsKey(taskKey)) { - throw new IllegalArgumentException("There is already a Throttling key registered with same name: " + taskKey); - } - THROTTLING_TASK_KEYS.put(taskKey, throttlingKey); - return throttlingKey; - } - - /** - * Class to store the throttling key for the tasks of cluster manager - */ - public static class ThrottlingKey { - private String taskThrottlingKey; - private boolean throttlingEnabled; - - /** - * Class for throttling key of tasks - * - * @param taskThrottlingKey - throttling key for task - * @param throttlingEnabled - if throttling is enabled or not i.e. data node is performing retry over throttling exception or not. - */ - private ThrottlingKey(String taskThrottlingKey, boolean throttlingEnabled) { - this.taskThrottlingKey = taskThrottlingKey; - this.throttlingEnabled = throttlingEnabled; - } - - public String getTaskThrottlingKey() { - return taskThrottlingKey; - } - - public boolean isThrottlingEnabled() { - return throttlingEnabled; - } - } - - void validateSetting(final Settings settings) { - if (minNodeVersionSupplier.get().compareTo(Version.V_2_4_0) < 0) { - throw new IllegalArgumentException("All the nodes in cluster should be on version later than or equal to 2.4.0"); - } - Map groups = settings.getAsGroups(); - for (String key : groups.keySet()) { - if (!THROTTLING_TASK_KEYS.containsKey(key)) { - throw new IllegalArgumentException("Cluster manager task throttling is not configured for given task type: " + key); - } - if (!THROTTLING_TASK_KEYS.get(key).isThrottlingEnabled()) { - throw new IllegalArgumentException("Throttling is not enabled for given task type: " + key); - } - int threshold = groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE); - if (threshold < MIN_THRESHOLD_VALUE) { - throw new IllegalArgumentException("Provide positive integer for limit or -1 for disabling throttling"); - } - } - } - - void updateSetting(final Settings settings) { - Map groups = settings.getAsGroups(); - for (String key : groups.keySet()) { - updateLimit(key, groups.get(key).getAsInt("value", MIN_THRESHOLD_VALUE)); - } - } - - void updateLimit(final String taskKey, final int limit) { - assert limit >= MIN_THRESHOLD_VALUE; - if (limit == MIN_THRESHOLD_VALUE) { - tasksThreshold.remove(taskKey); - } else { - tasksThreshold.put(taskKey, (long) limit); - } - } - - Long getThrottlingLimit(final String taskKey) { - return tasksThreshold.get(taskKey); - } - - @Override - public void onBeginSubmit(List tasks) { - ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey) - .getClusterManagerThrottlingKey(); - tasksCount.putIfAbsent(clusterManagerThrottlingKey.getTaskThrottlingKey(), 0L); - tasksCount.computeIfPresent(clusterManagerThrottlingKey.getTaskThrottlingKey(), (key, count) -> { - int size = tasks.size(); - if (clusterManagerThrottlingKey.isThrottlingEnabled()) { - Long threshold = tasksThreshold.get(clusterManagerThrottlingKey.getTaskThrottlingKey()); - if (threshold != null && (count + size > threshold)) { - clusterManagerTaskThrottlerListener.onThrottle(clusterManagerThrottlingKey.getTaskThrottlingKey(), size); - logger.warn( - "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", - clusterManagerThrottlingKey.getTaskThrottlingKey(), - tasks.size(), - threshold - ); - throw new ClusterManagerThrottlingException( - "Throttling Exception : Limit exceeded for " + clusterManagerThrottlingKey.getTaskThrottlingKey() - ); - } - } - return count + size; - }); - } - - @Override - public void onSubmitFailure(List tasks) { - reduceTaskCount(tasks); - } - - /** - * Tasks will be removed from the queue before processing, so here we will reduce the count of tasks. - * - * @param tasks list of tasks which will be executed. - */ - @Override - public void onBeginProcessing(List tasks) { - reduceTaskCount(tasks); - } - - @Override - public void onTimeout(List tasks) { - reduceTaskCount(tasks); - } - - private void reduceTaskCount(List tasks) { - String masterTaskKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey).getClusterManagerThrottlingKey() - .getTaskThrottlingKey(); - tasksCount.computeIfPresent(masterTaskKey, (key, count) -> count - tasks.size()); - } -} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerListener.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerListener.java deleted file mode 100644 index 9d41f4d39b09f..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerListener.java +++ /dev/null @@ -1,16 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.service; - -/** - * Listener interface for master task throttling - */ -public interface ClusterManagerTaskThrottlerListener { - void onThrottle(String type, final int counts); -} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java deleted file mode 100644 index 4e2ab2037f548..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.service; - -import org.opensearch.OpenSearchException; -import org.opensearch.common.io.stream.StreamInput; - -import java.io.IOException; - -/** - * Exception raised from cluster manager node due to task throttling. - */ -public class ClusterManagerThrottlingException extends OpenSearchException { - - public ClusterManagerThrottlingException(String msg, Object... args) { - super(msg, args); - } - - public ClusterManagerThrottlingException(StreamInput in) throws IOException { - super(in); - } -} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingStats.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingStats.java deleted file mode 100644 index fe4eb20902723..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerThrottlingStats.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.service; - -import org.opensearch.common.metrics.CounterMetric; - -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Contains stats of Cluster Manager Task Throttling. - * It stores the total cumulative count of throttled tasks per task type. - */ -public class ClusterManagerThrottlingStats implements ClusterManagerTaskThrottlerListener { - - private Map throttledTasksCount = new ConcurrentHashMap<>(); - - private void incrementThrottlingCount(String type, final int counts) { - throttledTasksCount.computeIfAbsent(type, k -> new CounterMetric()).inc(counts); - } - - public long getThrottlingCount(String type) { - return throttledTasksCount.get(type) == null ? 0 : throttledTasksCount.get(type).count(); - } - - public long getTotalThrottledTaskCount() { - CounterMetric totalCount = new CounterMetric(); - throttledTasksCount.forEach((aClass, counterMetric) -> { totalCount.inc(counterMetric.count()); }); - return totalCount.count(); - } - - @Override - public void onThrottle(String type, int counts) { - incrementThrottlingCount(type, counts); - } -} diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java index a605c41bdeff8..d393613118af8 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterService.java @@ -291,17 +291,6 @@ public final String getNodeName() { return nodeName; } - /** - * Functionality for register task key to cluster manager node. - * - * @param taskKey - task key of task - * @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not - * @return throttling task key which needs to be passed while submitting task to cluster manager - */ - public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) { - return clusterManagerService.registerClusterManagerTask(taskKey, throttlingEnabled); - } - /** * Submits a cluster state update task; unlike {@link #submitStateUpdateTask(String, Object, ClusterStateTaskConfig, * ClusterStateTaskExecutor, ClusterStateTaskListener)}, submitted updates will not be batched. diff --git a/server/src/main/java/org/opensearch/cluster/service/MasterService.java b/server/src/main/java/org/opensearch/cluster/service/MasterService.java index f78e2c760ebb3..b78707e994855 100644 --- a/server/src/main/java/org/opensearch/cluster/service/MasterService.java +++ b/server/src/main/java/org/opensearch/cluster/service/MasterService.java @@ -36,7 +36,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Assertions; -import org.opensearch.Version; import org.opensearch.action.support.PlainActionFuture; import org.opensearch.cluster.AckedClusterStateTaskListener; import org.opensearch.cluster.ClusterChangedEvent; @@ -128,8 +127,6 @@ public class MasterService extends AbstractLifecycleComponent { private volatile PrioritizedOpenSearchThreadPoolExecutor threadPoolExecutor; private volatile Batcher taskBatcher; - protected final ClusterManagerTaskThrottler clusterManagerTaskThrottler; - private final ClusterManagerThrottlingStats throttlingStats; public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { this.nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings)); @@ -140,8 +137,6 @@ public MasterService(Settings settings, ClusterSettings clusterSettings, ThreadP this::setSlowTaskLoggingThreshold ); - this.throttlingStats = new ClusterManagerThrottlingStats(); - this.clusterManagerTaskThrottler = new ClusterManagerTaskThrottler(clusterSettings, this::getMinNodeVersion, throttlingStats); this.threadPool = threadPool; } @@ -162,7 +157,7 @@ protected synchronized void doStart() { Objects.requireNonNull(clusterStatePublisher, "please set a cluster state publisher before starting"); Objects.requireNonNull(clusterStateSupplier, "please set a cluster state supplier before starting"); threadPoolExecutor = createThreadPoolExecutor(); - taskBatcher = new Batcher(logger, threadPoolExecutor, clusterManagerTaskThrottler); + taskBatcher = new Batcher(logger, threadPoolExecutor); } protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { @@ -177,8 +172,8 @@ protected PrioritizedOpenSearchThreadPoolExecutor createThreadPoolExecutor() { @SuppressWarnings("unchecked") class Batcher extends TaskBatcher { - Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) { - super(logger, threadExecutor, taskBatcherListener); + Batcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor) { + super(logger, threadExecutor); } @Override @@ -594,20 +589,6 @@ public List pendingTasks() { }).collect(Collectors.toList()); } - /** - * Returns the number of throttled pending tasks. - */ - public long numberOfThrottledPendingTasks() { - return throttlingStats.getTotalThrottledTaskCount(); - } - - /** - * Returns the min version of nodes in cluster - */ - public Version getMinNodeVersion() { - return state().getNodes().getMinNodeVersion(); - } - /** * Returns the number of currently pending tasks. */ @@ -934,17 +915,6 @@ void onNoLongerClusterManager() { } } - /** - * Functionality for register task key to cluster manager node. - * - * @param taskKey - task key of task - * @param throttlingEnabled - throttling is enabled for task or not i.e does data node perform retries on it or not - * @return throttling task key which needs to be passed while submitting task to cluster manager - */ - public ClusterManagerTaskThrottler.ThrottlingKey registerClusterManagerTask(String taskKey, boolean throttlingEnabled) { - return clusterManagerTaskThrottler.registerClusterManagerTask(taskKey, throttlingEnabled); - } - /** * Submits a batch of cluster state update tasks; submitted updates are guaranteed to be processed together, * potentially with more tasks of the same executor. diff --git a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java index b5710bab41172..e04c8617ecd33 100644 --- a/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java +++ b/server/src/main/java/org/opensearch/cluster/service/TaskBatcher.java @@ -62,12 +62,10 @@ public abstract class TaskBatcher { private final PrioritizedOpenSearchThreadPoolExecutor threadExecutor; // package visible for tests final Map> tasksPerBatchingKey = new HashMap<>(); - private final TaskBatcherListener taskBatcherListener; - public TaskBatcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor, TaskBatcherListener taskBatcherListener) { + public TaskBatcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor) { this.logger = logger; this.threadExecutor = threadExecutor; - this.taskBatcherListener = taskBatcherListener; } public void submitTasks(List tasks, @Nullable TimeValue timeout) throws OpenSearchRejectedExecutionException { @@ -77,46 +75,36 @@ public void submitTasks(List tasks, @Nullable TimeValue t final BatchedTask firstTask = tasks.get(0); assert tasks.stream().allMatch(t -> t.batchingKey == firstTask.batchingKey) : "tasks submitted in a batch should share the same batching key: " + tasks; - assert tasks.stream().allMatch(t -> t.getTask().getClass() == firstTask.getTask().getClass()) - : "tasks submitted in a batch should be of same class: " + tasks; - - taskBatcherListener.onBeginSubmit(tasks); - - try { - // convert to an identity map to check for dups based on task identity - final Map tasksIdentity = tasks.stream() - .collect( - Collectors.toMap( - BatchedTask::getTask, - Function.identity(), - (a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); }, - IdentityHashMap::new - ) - ); - - synchronized (tasksPerBatchingKey) { - LinkedHashSet existingTasks = tasksPerBatchingKey.computeIfAbsent( - firstTask.batchingKey, - k -> new LinkedHashSet<>(tasks.size()) - ); - for (BatchedTask existing : existingTasks) { - // check that there won't be two tasks with the same identity for the same batching key - BatchedTask duplicateTask = tasksIdentity.get(existing.getTask()); - if (duplicateTask != null) { - throw new IllegalStateException( - "task [" - + duplicateTask.describeTasks(Collections.singletonList(existing)) - + "] with source [" - + duplicateTask.source - + "] is already queued" - ); - } + // convert to an identity map to check for dups based on task identity + final Map tasksIdentity = tasks.stream() + .collect( + Collectors.toMap( + BatchedTask::getTask, + Function.identity(), + (a, b) -> { throw new IllegalStateException("cannot add duplicate task: " + a); }, + IdentityHashMap::new + ) + ); + + synchronized (tasksPerBatchingKey) { + LinkedHashSet existingTasks = tasksPerBatchingKey.computeIfAbsent( + firstTask.batchingKey, + k -> new LinkedHashSet<>(tasks.size()) + ); + for (BatchedTask existing : existingTasks) { + // check that there won't be two tasks with the same identity for the same batching key + BatchedTask duplicateTask = tasksIdentity.get(existing.getTask()); + if (duplicateTask != null) { + throw new IllegalStateException( + "task [" + + duplicateTask.describeTasks(Collections.singletonList(existing)) + + "] with source [" + + duplicateTask.source + + "] is already queued" + ); } - existingTasks.addAll(tasks); } - } catch (Exception e) { - taskBatcherListener.onSubmitFailure(tasks); - throw e; + existingTasks.addAll(tasks); } if (timeout != null) { @@ -148,7 +136,6 @@ private void onTimeoutInternal(List tasks, TimeValue time } } } - taskBatcherListener.onTimeout(toRemove); onTimeout(toRemove, timeout); } } @@ -186,7 +173,6 @@ void runIfNotProcessed(BatchedTask updateTask) { return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]"; }).reduce((s1, s2) -> s1 + ", " + s2).orElse(""); - taskBatcherListener.onBeginProcessing(toExecute); run(updateTask.batchingKey, toExecute, tasksSummary); } } diff --git a/server/src/main/java/org/opensearch/cluster/service/TaskBatcherListener.java b/server/src/main/java/org/opensearch/cluster/service/TaskBatcherListener.java deleted file mode 100644 index 2feaf2540a96e..0000000000000 --- a/server/src/main/java/org/opensearch/cluster/service/TaskBatcherListener.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.service; - -import java.util.List; - -/** - * Listener class for callback on various events of TaskBatcher. - */ -public interface TaskBatcherListener { - /** - * Callback called before submitting tasks. - * @param tasks list of tasks which will be submitted. - */ - void onBeginSubmit(List tasks); - - /** - * Callback called if tasks submission due to any reason - * for e.g. failing due to duplicate tasks. - * @param tasks list of tasks which was failed to submit. - */ - void onSubmitFailure(List tasks); - - /** - * Callback called before processing any tasks. - * @param tasks list of tasks which will be executed. - */ - void onBeginProcessing(List tasks); - - /** - * Callback called when tasks are timed out. - * @param tasks list of tasks which will be executed. - */ - void onTimeout(List tasks); -} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index a46373ad056c3..70a17837839cf 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -92,7 +92,6 @@ import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.cluster.service.ClusterApplierService; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterManagerService; import org.opensearch.common.logging.Loggers; import org.opensearch.common.network.NetworkModule; @@ -592,7 +591,6 @@ public void apply(Settings value, Settings current, Settings previous) { IndexingPressure.MAX_INDEXING_BYTES, TaskResourceTrackingService.TASK_RESOURCE_TRACKING_ENABLED, TaskManager.TASK_RESOURCE_CONSUMERS_ENABLED, - ClusterManagerTaskThrottler.THRESHOLD_SETTINGS, // Settings related to search backpressure SearchBackpressureSettings.SETTING_MODE, diff --git a/server/src/main/java/org/opensearch/ingest/IngestService.java b/server/src/main/java/org/opensearch/ingest/IngestService.java index b9785d9ec036f..b8256fe896da4 100644 --- a/server/src/main/java/org/opensearch/ingest/IngestService.java +++ b/server/src/main/java/org/opensearch/ingest/IngestService.java @@ -56,8 +56,6 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.MetadataIndexTemplateService; import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.Tuple; import org.opensearch.common.regex.Regex; @@ -116,8 +114,6 @@ public class IngestService implements ClusterStateApplier, ReportingService> ingestClusterStateListeners = new CopyOnWriteArrayList<>(); - private final ClusterManagerTaskThrottler.ThrottlingKey putPipelineTaskKey; - private final ClusterManagerTaskThrottler.ThrottlingKey deletePipelineTaskKey; private volatile ClusterState state; public IngestService( @@ -145,11 +141,8 @@ public IngestService( threadPool.generic()::execute ) ); - this.threadPool = threadPool; - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - putPipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_PIPELINE_KEY, true); - deletePipelineTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_PIPELINE_KEY, true); + this.threadPool = threadPool; } private static Map processorFactories(List ingestPlugins, Processor.Parameters parameters) { @@ -298,11 +291,6 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { public ClusterState execute(ClusterState currentState) { return innerDelete(request, currentState); } - - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return deletePipelineTaskKey; - } } ); } @@ -397,11 +385,6 @@ protected AcknowledgedResponse newResponse(boolean acknowledged) { public ClusterState execute(ClusterState currentState) { return innerPut(request, currentState); } - - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return putPipelineTaskKey; - } } ); } diff --git a/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java b/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java index bce3095d0c30d..9dc7f7d7380cc 100644 --- a/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java +++ b/server/src/main/java/org/opensearch/persistent/PersistentTasksClusterService.java @@ -45,8 +45,6 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; @@ -83,10 +81,6 @@ public class PersistentTasksClusterService implements ClusterStateListener, Clos private final EnableAssignmentDecider decider; private final ThreadPool threadPool; private final PeriodicRechecker periodicRechecker; - private final ClusterManagerTaskThrottler.ThrottlingKey createPersistentTaskKey; - private final ClusterManagerTaskThrottler.ThrottlingKey finishPersistentTaskKey; - private final ClusterManagerTaskThrottler.ThrottlingKey removePersistentTaskKey; - private final ClusterManagerTaskThrottler.ThrottlingKey updatePersistentTaskKey; public PersistentTasksClusterService( Settings settings, @@ -104,12 +98,6 @@ public PersistentTasksClusterService( } clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_TASKS_ALLOCATION_RECHECK_INTERVAL_SETTING, this::setRecheckInterval); - - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - createPersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_PERSISTENT_TASK_KEY, true); - finishPersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.FINISH_PERSISTENT_TASK_KEY, true); - removePersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.REMOVE_PERSISTENT_TASK_KEY, true); - updatePersistentTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.UPDATE_TASK_STATE_KEY, true); } // visible for testing only @@ -156,11 +144,6 @@ public ClusterState execute(ClusterState currentState) { return update(currentState, builder.addTask(taskId, taskName, taskParams, assignment)); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return createPersistentTaskKey; - } - @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -220,11 +203,6 @@ public ClusterState execute(ClusterState currentState) { } } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return finishPersistentTaskKey; - } - @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -256,11 +234,6 @@ public ClusterState execute(ClusterState currentState) { } } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return removePersistentTaskKey; - } - @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -304,11 +277,6 @@ public ClusterState execute(ClusterState currentState) { } } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return updatePersistentTaskKey; - } - @Override public void onFailure(String source, Exception e) { listener.onFailure(e); diff --git a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java index 32bdb8b665520..c70c10495b7b5 100644 --- a/server/src/main/java/org/opensearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/opensearch/repositories/RepositoriesService.java @@ -53,8 +53,6 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; import org.opensearch.common.component.AbstractLifecycleComponent; @@ -113,8 +111,6 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C private final Map internalRepositories = ConcurrentCollections.newConcurrentMap(); private volatile Map repositories = Collections.emptyMap(); private final RepositoriesStatsArchive repositoriesStatsArchive; - private final ClusterManagerTaskThrottler.ThrottlingKey putRepositoryTaskKey; - private final ClusterManagerTaskThrottler.ThrottlingKey deleteRepositoryTaskKey; public RepositoriesService( Settings settings, @@ -141,9 +137,6 @@ public RepositoriesService( REPOSITORIES_STATS_ARCHIVE_MAX_ARCHIVED_STATS.get(settings), threadPool::relativeTimeInMillis ); - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - putRepositoryTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.PUT_REPOSITORY_KEY, true); - deleteRepositoryTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_REPOSITORY_KEY, true); } /** @@ -236,11 +229,6 @@ public ClusterState execute(ClusterState currentState) { return ClusterState.builder(currentState).metadata(mdBuilder).build(); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return putRepositoryTaskKey; - } - @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("failed to create repository [{}]", request.name()), e); @@ -302,11 +290,6 @@ public ClusterState execute(ClusterState currentState) { throw new RepositoryMissingException(request.name()); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return deleteRepositoryTaskKey; - } - @Override public boolean mustAck(DiscoveryNode discoveryNode) { // repository was created on both cluster-manager and data nodes diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 50388a1354327..3513cd6d00ce5 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -63,7 +63,6 @@ import org.opensearch.cluster.metadata.RepositoryMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Numbers; @@ -455,11 +454,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS public TimeValue timeout() { return updateTask.timeout(); } - - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return updateTask.getClusterManagerThrottlingKey(); - } }); }, onFailure)); } diff --git a/server/src/main/java/org/opensearch/script/ScriptService.java b/server/src/main/java/org/opensearch/script/ScriptService.java index 0eeb6b38e5b27..303fc5ccbcf88 100644 --- a/server/src/main/java/org/opensearch/script/ScriptService.java +++ b/server/src/main/java/org/opensearch/script/ScriptService.java @@ -45,7 +45,6 @@ import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateApplier; import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; import org.opensearch.common.settings.ClusterSettings; @@ -546,7 +545,6 @@ protected StoredScriptSource getScriptFromClusterState(String id) { public void putStoredScript( ClusterService clusterService, PutStoredScriptRequest request, - ClusterManagerTaskThrottler.ThrottlingKey putStoreTaskKey, ActionListener listener ) { if (request.content().length() > maxSizeInBytes) { @@ -606,11 +604,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { return ClusterState.builder(currentState).metadata(mdb).build(); } - - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return putStoreTaskKey; - } } ); } @@ -618,7 +611,6 @@ public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey( public void deleteStoredScript( ClusterService clusterService, DeleteStoredScriptRequest request, - ClusterManagerTaskThrottler.ThrottlingKey deleteScriptTaskKey, ActionListener listener ) { clusterService.submitStateUpdateTask( @@ -638,11 +630,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { return ClusterState.builder(currentState).metadata(mdb).build(); } - - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return deleteScriptTaskKey; - } } ); } diff --git a/server/src/main/java/org/opensearch/snapshots/RestoreService.java b/server/src/main/java/org/opensearch/snapshots/RestoreService.java index 9a4c3e7dc8ce8..643f176a82221 100644 --- a/server/src/main/java/org/opensearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/opensearch/snapshots/RestoreService.java @@ -75,8 +75,6 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Priority; import org.opensearch.common.UUIDs; @@ -180,8 +178,6 @@ public class RestoreService implements ClusterStateApplier { private final ClusterSettings clusterSettings; - private final ClusterManagerTaskThrottler.ThrottlingKey restoreSnapshotTaskKey; - private static final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor(); public RestoreService( @@ -203,10 +199,6 @@ public RestoreService( } this.clusterSettings = clusterService.getClusterSettings(); this.shardLimitValidator = shardLimitValidator; - - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - restoreSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.RESTORE_SNAPSHOT_KEY, true); - } /** @@ -398,11 +390,6 @@ public void restoreSnapshot(final RestoreSnapshotRequest request, final ActionLi final String restoreUUID = UUIDs.randomBase64UUID(); RestoreInfo restoreInfo = null; - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return restoreSnapshotTaskKey; - } - @Override public ClusterState execute(ClusterState currentState) { RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY); diff --git a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java index a777388218d16..4f672c9813d64 100644 --- a/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/opensearch/snapshots/SnapshotsService.java @@ -77,8 +77,6 @@ import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; -import org.opensearch.cluster.service.ClusterManagerTaskKeys; -import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.Priority; @@ -200,10 +198,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final OngoingRepositoryOperations repositoryOperations = new OngoingRepositoryOperations(); - private final ClusterManagerTaskThrottler.ThrottlingKey createSnapshotTaskKey; - private final ClusterManagerTaskThrottler.ThrottlingKey deleteSnapshotTaskKey; - private static ClusterManagerTaskThrottler.ThrottlingKey updateSnapshotStateTaskKey; - /** * Setting that specifies the maximum number of allowed concurrent snapshot create and delete operations in the * cluster state. The number of concurrent operations in a cluster state is defined as the sum of the sizes of @@ -248,11 +242,6 @@ public SnapshotsService( clusterService.getClusterSettings() .addSettingsUpdateConsumer(MAX_CONCURRENT_SNAPSHOT_OPERATIONS_SETTING, i -> maxConcurrentOperations = i); } - - // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. - createSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_SNAPSHOT_KEY, true); - deleteSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.DELETE_SNAPSHOT_KEY, true); - updateSnapshotStateTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.UPDATE_SNAPSHOT_STATE_KEY, true); } /** @@ -538,11 +527,6 @@ public void onFailure(String source, Exception e) { listener.onFailure(e); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return createSnapshotTaskKey; - } - @Override public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { try { @@ -2289,11 +2273,6 @@ public ClusterState execute(ClusterState currentState) throws Exception { .build(); } - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return deleteSnapshotTaskKey; - } - @Override public void onFailure(String source, Exception e) { listener.onFailure(e); @@ -3260,20 +3239,7 @@ public boolean assertAllListenersResolved() { * * Package private to allow for tests. */ - static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = new ClusterStateTaskExecutor() { - @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) - throws Exception { - return shardStateExecutor.execute(currentState, tasks); - } - - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return updateSnapshotStateTaskKey; - } - }; - - static final ClusterStateTaskExecutor shardStateExecutor = (currentState, tasks) -> { + static final ClusterStateTaskExecutor SHARD_STATE_EXECUTOR = (currentState, tasks) -> { int changedCount = 0; int startedCount = 0; final List entries = new ArrayList<>(); diff --git a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java index 58ea9b1c29467..ff2bb77531486 100644 --- a/server/src/test/java/org/opensearch/ExceptionSerializationTests.java +++ b/server/src/test/java/org/opensearch/ExceptionSerializationTests.java @@ -56,7 +56,6 @@ import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.cluster.routing.TestShardRouting; -import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.common.ParsingException; import org.opensearch.common.Strings; import org.opensearch.common.UUIDs; @@ -865,7 +864,6 @@ public void testIds() { ids.put(162, PrimaryShardClosedException.class); ids.put(163, DecommissioningFailedException.class); ids.put(164, NodeDecommissionedException.class); - ids.put(165, ClusterManagerThrottlingException.class); Map, Integer> reverse = new HashMap<>(); for (Map.Entry> entry : ids.entrySet()) { diff --git a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java index 2f9ae9a154f46..1b7d848b626fe 100644 --- a/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/BackoffPolicyTests.java @@ -75,45 +75,4 @@ public void testWrapBackoffPolicy() { assertEquals(expectedRetries, retries.get()); } } - - public void testEqualJitterExponentialBackOffPolicy() { - int baseDelay = 10; - int maxDelay = 10000; - BackoffPolicy policy = BackoffPolicy.exponentialEqualJitterBackoff(baseDelay, maxDelay); - Iterator iterator = policy.iterator(); - - // Assert equal jitter - int retriesTillMaxDelay = 10; - for (int i = 0; i < retriesTillMaxDelay; i++) { - TimeValue delay = iterator.next(); - assertTrue(delay.getMillis() >= baseDelay * (1L << i) / 2); - assertTrue(delay.getMillis() <= baseDelay * (1L << i)); - } - - // Now policy should return max delay for next retries. - int retriesAfterMaxDelay = randomInt(10); - for (int i = 0; i < retriesAfterMaxDelay; i++) { - TimeValue delay = iterator.next(); - assertTrue(delay.getMillis() >= maxDelay / 2); - assertTrue(delay.getMillis() <= maxDelay); - } - } - - public void testExponentialBackOffPolicy() { - long baseDelay = 10; - int maxDelay = 10000; - long currentDelay = baseDelay; - BackoffPolicy policy = BackoffPolicy.exponentialFullJitterBackoff(baseDelay); - Iterator iterator = policy.iterator(); - - // Assert equal jitter - int numberOfRetries = randomInt(20); - - for (int i = 0; i < numberOfRetries; i++) { - TimeValue delay = iterator.next(); - assertTrue(delay.getMillis() >= 0); - assertTrue(delay.getMillis() <= currentDelay); - currentDelay = currentDelay * 2; - } - } } diff --git a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java index c45bae224dbd6..1195ed2590b1e 100644 --- a/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java +++ b/server/src/test/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeActionTests.java @@ -52,7 +52,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodeRole; import org.opensearch.cluster.node.DiscoveryNodes; -import org.opensearch.cluster.service.ClusterManagerThrottlingException; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -81,9 +80,6 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.BrokenBarrierException; -import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.atomic.AtomicBoolean; import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.opensearch.test.ClusterServiceUtils.setState; @@ -610,102 +606,4 @@ public void testDelegateToClusterManagerOnNodeWithDeprecatedMasterRole() throws assertTrue(listener.isDone()); assertThat(listener.get(), equalTo(response)); } - - public void testThrottlingRetryLocalMaster() throws InterruptedException, BrokenBarrierException { - Request request = new Request(); - PlainActionFuture listener = new PlainActionFuture<>(); - AtomicBoolean exception = new AtomicBoolean(true); - AtomicBoolean retried = new AtomicBoolean(false); - CyclicBarrier barrier = new CyclicBarrier(2); - setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[] { localNode })); - - TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) { - @Override - protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) { - if (exception.getAndSet(false)) { - throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for test"); - } else { - try { - retried.set(true); - barrier.await(); - } catch (Exception e) { - throw new AssertionError(); - } - } - } - }; - action.execute(request, listener); - - barrier.await(); - assertTrue(retried.get()); - assertFalse(exception.get()); - } - - public void testThrottlingRetryRemoteMaster() throws ExecutionException, InterruptedException { - Request request = new Request().clusterManagerNodeTimeout(TimeValue.timeValueSeconds(60)); - DiscoveryNode masterNode = this.remoteNode; - setState( - clusterService, - // use a random base version so it can go down when simulating a restart. - ClusterState.builder(ClusterStateCreationUtils.state(localNode, masterNode, new DiscoveryNode[] { localNode, masterNode })) - .version(randomIntBetween(0, 10)) - ); - - PlainActionFuture listener = new PlainActionFuture<>(); - TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool); - action.execute(request, listener); - - CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear(); - assertThat(capturedRequests.length, equalTo(1)); - CapturingTransport.CapturedRequest capturedRequest = capturedRequests[0]; - assertTrue(capturedRequest.node.isMasterNode()); - assertThat(capturedRequest.request, equalTo(request)); - assertThat(capturedRequest.action, equalTo("internal:testAction")); - transport.handleRemoteError( - capturedRequest.requestId, - new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for test") - ); - - assertFalse(listener.isDone()); - - // waiting for retry to trigger - Thread.sleep(100); - - // Retry for above throttling exception - capturedRequests = transport.getCapturedRequestsAndClear(); - assertThat(capturedRequests.length, equalTo(1)); - capturedRequest = capturedRequests[0]; - Response response = new Response(); - transport.handleResponse(capturedRequest.requestId, response); - - assertTrue(listener.isDone()); - listener.get(); - } - - public void testRetryForDifferentException() throws InterruptedException, BrokenBarrierException { - Request request = new Request(); - PlainActionFuture listener = new PlainActionFuture<>(); - AtomicBoolean exception = new AtomicBoolean(true); - AtomicBoolean retried = new AtomicBoolean(false); - CyclicBarrier barrier = new CyclicBarrier(2); - setState(clusterService, ClusterStateCreationUtils.state(localNode, localNode, new DiscoveryNode[] { localNode })); - - TransportClusterManagerNodeAction action = new Action("internal:testAction", transportService, clusterService, threadPool) { - @Override - protected void masterOperation(Task task, Request request, ClusterState state, ActionListener listener) - throws Exception { - if (exception.getAndSet(false)) { - throw new Exception("Different exception"); - } else { - // If called second time due to retry, throw exception - retried.set(true); - throw new AssertionError("Should not retry for other exception"); - } - } - }; - action.execute(request, listener); - - assertFalse(retried.get()); - assertFalse(exception.get()); - } } diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java index 5caea9f5bf674..c374392fc3d0e 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataDeleteIndexServiceTests.java @@ -39,7 +39,6 @@ import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.AllocationService; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; @@ -84,7 +83,7 @@ public void setUp() throws Exception { when(allocationService.reroute(any(ClusterState.class), any(String.class))).thenAnswer( mockInvocation -> mockInvocation.getArguments()[0] ); - service = new MetadataDeleteIndexService(Settings.EMPTY, mock(ClusterService.class), allocationService); + service = new MetadataDeleteIndexService(Settings.EMPTY, null, allocationService); } public void testDeleteMissing() { diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexAliasesServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexAliasesServiceTests.java index f3c7e73e419db..d37756dabfe8d 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexAliasesServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexAliasesServiceTests.java @@ -35,7 +35,6 @@ import org.opensearch.Version; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.set.Sets; import org.opensearch.index.Index; @@ -67,7 +66,7 @@ public class MetadataIndexAliasesServiceTests extends OpenSearchTestCase { private final AliasValidator aliasValidator = new AliasValidator(); private final MetadataDeleteIndexService deleteIndexService = mock(MetadataDeleteIndexService.class); private final MetadataIndexAliasesService service = new MetadataIndexAliasesService( - mock(ClusterService.class), + null, null, aliasValidator, deleteIndexService, diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java index cbd56c8d05116..aa1c500030bd6 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataIndexTemplateServiceTests.java @@ -56,6 +56,7 @@ import org.opensearch.index.mapper.MapperParsingException; import org.opensearch.index.mapper.MapperService; import org.opensearch.indices.IndexTemplateMissingException; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.InvalidIndexTemplateException; import org.opensearch.indices.SystemIndices; import org.opensearch.test.OpenSearchSingleNodeTestCase; @@ -2109,7 +2110,7 @@ private static List putTemplate(NamedXContentRegistry xContentRegistr new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()) ); MetadataIndexTemplateService service = new MetadataIndexTemplateService( - clusterService, + null, createIndexService, new AliasValidator(), null, @@ -2154,7 +2155,31 @@ public void onFailure(Exception e) { } private MetadataIndexTemplateService getMetadataIndexTemplateService() { - return getInstanceFromNode(MetadataIndexTemplateService.class); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + MetadataCreateIndexService createIndexService = new MetadataCreateIndexService( + Settings.EMPTY, + clusterService, + indicesService, + null, + null, + createTestShardLimitService(randomIntBetween(1, 1000), false), + new Environment(builder().put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toString()).build(), null), + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + null, + xContentRegistry(), + new SystemIndices(Collections.emptyMap()), + true, + new AwarenessReplicaBalance(Settings.EMPTY, clusterService.getClusterSettings()) + ); + return new MetadataIndexTemplateService( + clusterService, + createIndexService, + new AliasValidator(), + indicesService, + new IndexScopedSettings(Settings.EMPTY, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS), + xContentRegistry() + ); } @SuppressWarnings("unchecked") diff --git a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java b/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java deleted file mode 100644 index d20fed5c37361..0000000000000 --- a/server/src/test/java/org/opensearch/cluster/service/ClusterManagerTaskThrottlerTests.java +++ /dev/null @@ -1,366 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.cluster.service; - -import org.opensearch.Version; -import org.opensearch.action.support.replication.ClusterStateCreationUtils; -import org.opensearch.cluster.ClusterChangedEvent; -import org.opensearch.cluster.ClusterState; -import org.opensearch.cluster.ClusterStateTaskExecutor; -import org.opensearch.cluster.node.DiscoveryNode; -import org.opensearch.cluster.node.DiscoveryNodeRole; -import org.opensearch.common.Priority; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.test.ClusterServiceUtils; -import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; - -import static org.opensearch.test.ClusterServiceUtils.setState; - -/** - * Contains tests for {@link ClusterManagerTaskThrottler} - */ -public class ClusterManagerTaskThrottlerTests extends OpenSearchTestCase { - - private static ThreadPool threadPool; - private ClusterService clusterService; - private DiscoveryNode localNode; - private DiscoveryNode[] allNodes; - private ClusterManagerThrottlingStats throttlingStats; - - @BeforeClass - public static void beforeClass() { - threadPool = new TestThreadPool("TransportMasterNodeActionTests"); - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - clusterService = ClusterServiceUtils.createClusterService(threadPool); - localNode = new DiscoveryNode( - "local_node", - buildNewFakeTransportAddress(), - Collections.emptyMap(), - Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), - Version.V_2_4_0 - ); - allNodes = new DiscoveryNode[] { localNode }; - throttlingStats = new ClusterManagerThrottlingStats(); - } - - @After - public void tearDown() throws Exception { - super.tearDown(); - clusterService.close(); - } - - @AfterClass - public static void afterClass() { - ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS); - threadPool = null; - } - - public void testDefaults() { - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( - clusterSettings, - () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats - ); - throttler.registerClusterManagerTask("put-mapping", true); - throttler.registerClusterManagerTask("create-index", true); - for (String key : throttler.THROTTLING_TASK_KEYS.keySet()) { - assertNull(throttler.getThrottlingLimit(key)); - } - } - - public void testValidateSettingsForDifferentVersion() { - DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); - DiscoveryNode dataNode = getDataNode(Version.V_2_0_0); - setState( - clusterService, - ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) - ); - - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( - clusterSettings, - () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats - ); - throttler.registerClusterManagerTask("put-mapping", true); - - // set some limit for update snapshot tasks - int newLimit = randomIntBetween(1, 10); - - Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build(); - assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); - } - - public void testValidateSettingsForTaskWihtoutRetryOnDataNode() { - DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); - DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); - setState( - clusterService, - ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) - ); - - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( - clusterSettings, - () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats - ); - throttler.registerClusterManagerTask("put-mapping", false); - - // set some limit for update snapshot tasks - int newLimit = randomIntBetween(1, 10); - - Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build(); - assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); - } - - public void testValidateSettingsForUnknownTask() { - DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); - DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); - setState( - clusterService, - ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) - ); - - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( - clusterSettings, - () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats - ); - - // set some limit for update snapshot tasks - int newLimit = randomIntBetween(1, 10); - - Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.random-task.value", newLimit).build(); - assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); - } - - public void testUpdateThrottlingLimitForBasicSanity() { - DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); - DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); - setState( - clusterService, - ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) - ); - - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( - clusterSettings, - () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats - ); - throttler.registerClusterManagerTask("put-mapping", true); - - // set some limit for update snapshot tasks - long newLimit = randomLongBetween(1, 10); - - Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", newLimit).build(); - clusterSettings.applySettings(newSettings); - assertEquals(newLimit, throttler.getThrottlingLimit("put-mapping").intValue()); - - // set update snapshot task limit to default - newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.value", -1).build(); - clusterSettings.applySettings(newSettings); - assertNull(throttler.getThrottlingLimit("put-mapping")); - } - - public void testValidateSettingForLimit() { - DiscoveryNode clusterManagerNode = getClusterManagerNode(Version.V_2_4_0); - DiscoveryNode dataNode = getDataNode(Version.V_2_4_0); - setState( - clusterService, - ClusterStateCreationUtils.state(clusterManagerNode, clusterManagerNode, new DiscoveryNode[] { clusterManagerNode, dataNode }) - ); - - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( - clusterSettings, - () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats - ); - throttler.registerClusterManagerTask("put-mapping", true); - - Settings newSettings = Settings.builder().put("cluster_manager.throttling.thresholds.put-mapping.values", -5).build(); - assertThrows(IllegalArgumentException.class, () -> throttler.validateSetting(newSettings)); - } - - public void testUpdateLimit() { - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( - clusterSettings, - () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats - ); - throttler.registerClusterManagerTask("put-mapping", true); - - throttler.updateLimit("test", 5); - assertEquals(5L, throttler.getThrottlingLimit("test").intValue()); - throttler.updateLimit("test", -1); - assertNull(throttler.getThrottlingLimit("test")); - } - - private DiscoveryNode getDataNode(Version version) { - return new DiscoveryNode( - "local_data_node", - buildNewFakeTransportAddress(), - Collections.emptyMap(), - Collections.singleton(DiscoveryNodeRole.DATA_ROLE), - version - ); - } - - private DiscoveryNode getClusterManagerNode(Version version) { - return new DiscoveryNode( - "local_master_node", - buildNewFakeTransportAddress(), - Collections.emptyMap(), - Collections.singleton(DiscoveryNodeRole.CLUSTER_MANAGER_ROLE), - version - ); - } - - public void testThrottlingForDisabledThrottlingTask() { - String taskKey = "test"; - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( - clusterSettings, - () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats - ); - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, false); - - // adding limit directly in thresholds - throttler.updateLimit(taskKey, 5); - - // adding 10 tasks, should pass as throttling is disabled for task - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 10)); - - // Asserting that there was not any throttling for it - assertEquals(0L, throttlingStats.getThrottlingCount(taskKey)); - } - - public void testThrottling() { - String taskKey = "test"; - ClusterSettings clusterSettings = new ClusterSettings(Settings.builder().build(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); - ClusterManagerTaskThrottler throttler = new ClusterManagerTaskThrottler( - clusterSettings, - () -> { return clusterService.getMasterService().getMinNodeVersion(); }, - throttlingStats - ); - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = throttler.registerClusterManagerTask(taskKey, true); - - throttler.updateLimit(taskKey, 5); - - // adding 3 tasks - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); - - // adding 3 more tasks, these tasks should be throttled - // taskCount in Queue: 3 Threshold: 5 - assertThrows( - ClusterManagerThrottlingException.class, - () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)) - ); - assertEquals(3L, throttlingStats.getThrottlingCount(taskKey)); - - // remove one task - throttler.onBeginProcessing(getMockUpdateTaskList(taskKey, throttlingKey, 1)); - - // add 3 tasks should pass now. - // taskCount in Queue: 2 Threshold: 5 - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 3)); - - // adding one task will throttle - // taskCount in Queue: 5 Threshold: 5 - assertThrows( - ClusterManagerThrottlingException.class, - () -> throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)) - ); - assertEquals(4L, throttlingStats.getThrottlingCount(taskKey)); - - // update limit of threshold 6 - throttler.updateLimit(taskKey, 6); - - // adding one task should pass now - throttler.onBeginSubmit(getMockUpdateTaskList(taskKey, throttlingKey, 1)); - } - - private List getMockUpdateTaskList( - String taskKey, - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey, - int size - ) { - TaskBatcherTests.TestTaskBatcher testTaskBatcher = new TaskBatcherTests.TestTaskBatcher(logger, null); - List taskList = new ArrayList<>(); - - class MockExecutor - implements - TaskExecutorTests.TestExecutor, - ClusterStateTaskExecutor { - - @Override - public ClusterTasksResult execute( - ClusterState currentState, - List tasks - ) throws Exception { - // No Op - return null; - } - - @Override - public boolean runOnlyOnMaster() { - return true; - } - - @Override - public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {} - - @Override - public void execute(List tasks) {} - - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return throttlingKey; - } - - @Override - public String describeTasks(List tasks) { - return taskKey; - } - } - - for (int i = 0; i < size; i++) { - taskList.add(testTaskBatcher.new UpdateTask(Priority.HIGH, taskKey, taskKey, (source, e) -> { - if (!(e instanceof ClusterManagerThrottlingException)) { - throw new AssertionError(e); - } - }, new MockExecutor())); - } - return taskList; - } -} diff --git a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java index fb47cb8e2d65a..a397f295bcaf2 100644 --- a/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/MasterServiceTests.java @@ -692,273 +692,6 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS } } - public void testThrottlingForTaskSubmission() throws InterruptedException { - MasterService masterService = createClusterManagerService(true); - int throttlingLimit = randomIntBetween(1, 10); - int taskId = 1; - final CyclicBarrier barrier = new CyclicBarrier(2); - final CountDownLatch latch = new CountDownLatch(1); - final String taskName = "test"; - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey = masterService.registerClusterManagerTask(taskName, true); - class Task { - private final int id; - - Task(int id) { - this.id = id; - } - } - - class TaskExecutor implements ClusterStateTaskExecutor { - private AtomicInteger published = new AtomicInteger(); - - @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { - latch.countDown(); - barrier.await(); - return ClusterTasksResult.builder().successes(tasks).build(currentState); - } - - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return throttlingKey; - } - - @Override - public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) { - published.incrementAndGet(); - } - } - - masterService.clusterManagerTaskThrottler.updateLimit(taskName, throttlingLimit); - - final ClusterStateTaskListener listener = new ClusterStateTaskListener() { - @Override - public void onFailure(String source, Exception e) {} - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {} - }; - - TaskExecutor executor = new TaskExecutor(); - // submit one task which will be execution, post that will submit throttlingLimit tasks. - try { - masterService.submitStateUpdateTask( - taskName, - new Task(taskId++), - ClusterStateTaskConfig.build(randomFrom(Priority.values())), - executor, - listener - ); - } catch (Exception e) { - throw new AssertionError(e); - } - // wait till task enter in execution. - latch.await(); - - for (int i = 0; i < throttlingLimit; i++) { - try { - masterService.submitStateUpdateTask( - taskName, - new Task(taskId++), - ClusterStateTaskConfig.build(randomFrom(Priority.values())), - executor, - listener - ); - } catch (Exception e) { - throw new AssertionError(e); - } - } - - // we have one task in execution and tasks in queue so next task should throttled. - final AtomicReference assertionRef = new AtomicReference<>(); - try { - masterService.submitStateUpdateTask( - taskName, - new Task(taskId++), - ClusterStateTaskConfig.build(randomFrom(Priority.values())), - executor, - listener - ); - } catch (ClusterManagerThrottlingException e) { - assertionRef.set(e); - } - assertNotNull(assertionRef.get()); - masterService.close(); - } - - public void testThrottlingForMultipleTaskTypes() throws InterruptedException { - MasterService masterService = createClusterManagerService(true); - int throttlingLimitForTask1 = randomIntBetween(1, 5); - int throttlingLimitForTask2 = randomIntBetween(1, 5); - int throttlingLimitForTask3 = randomIntBetween(1, 5); - int numberOfTask1 = randomIntBetween(throttlingLimitForTask1, 10); - int numberOfTask2 = randomIntBetween(throttlingLimitForTask2, 10); - int numberOfTask3 = randomIntBetween(throttlingLimitForTask3, 10); - String task1 = "Task1"; - String task2 = "Task2"; - String task3 = "Task3"; - - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey1 = masterService.registerClusterManagerTask(task1, true); - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey2 = masterService.registerClusterManagerTask(task2, true); - ClusterManagerTaskThrottler.ThrottlingKey throttlingKey3 = masterService.registerClusterManagerTask(task3, true); - class Task {} - class Task1 extends Task {} - class Task2 extends Task {} - class Task3 extends Task {} - - class Task1Executor implements ClusterStateTaskExecutor { - @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { - Thread.sleep(randomInt(1000)); - return ClusterTasksResult.builder().successes(tasks).build(currentState); - } - - @Override - public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {} - - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return throttlingKey1; - } - } - - class Task2Executor implements ClusterStateTaskExecutor { - @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { - Thread.sleep(randomInt(1000)); - return ClusterTasksResult.builder().successes(tasks).build(currentState); - } - - @Override - public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {} - - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return throttlingKey2; - } - } - - class Task3Executor implements ClusterStateTaskExecutor { - @Override - public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { - Thread.sleep(randomInt(1000)); - return ClusterTasksResult.builder().successes(tasks).build(currentState); - } - - @Override - public void clusterStatePublished(ClusterChangedEvent clusterChangedEvent) {} - - @Override - public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey() { - return throttlingKey3; - } - } - - // configuring limits for Task1 and Task3. All task submission of Task2 should pass. - masterService.clusterManagerTaskThrottler.updateLimit(task1, throttlingLimitForTask1); - masterService.clusterManagerTaskThrottler.updateLimit(task3, throttlingLimitForTask3); - final CountDownLatch latch = new CountDownLatch(numberOfTask1 + numberOfTask2 + numberOfTask3); - AtomicInteger throttledTask1 = new AtomicInteger(); - AtomicInteger throttledTask2 = new AtomicInteger(); - AtomicInteger throttledTask3 = new AtomicInteger(); - AtomicInteger succeededTask1 = new AtomicInteger(); - AtomicInteger succeededTask2 = new AtomicInteger(); - AtomicInteger timedOutTask3 = new AtomicInteger(); - - final ClusterStateTaskListener listener = new ClusterStateTaskListener() { - @Override - public void onFailure(String source, Exception e) { - // Task3's timeout should have called this. - assertEquals(task3, source); - timedOutTask3.incrementAndGet(); - latch.countDown(); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (source.equals(task1)) { - succeededTask1.incrementAndGet(); - } else if (source.equals(task2)) { - succeededTask2.incrementAndGet(); - } - latch.countDown(); - } - }; - Task1Executor executor1 = new Task1Executor(); - Task2Executor executor2 = new Task2Executor(); - Task3Executor executor3 = new Task3Executor(); - List threads = new ArrayList(); - for (int i = 0; i < numberOfTask1; i++) { - threads.add(new Thread(new Runnable() { - @Override - public void run() { - try { - masterService.submitStateUpdateTask( - task1, - new Task1(), - ClusterStateTaskConfig.build(randomFrom(Priority.values())), - executor1, - listener - ); - } catch (ClusterManagerThrottlingException e) { - // Exception should be RejactedExecutionException. - throttledTask1.incrementAndGet(); - latch.countDown(); - } - } - })); - } - for (int i = 0; i < numberOfTask2; i++) { - threads.add(new Thread(new Runnable() { - @Override - public void run() { - try { - masterService.submitStateUpdateTask( - task2, - new Task2(), - ClusterStateTaskConfig.build(randomFrom(Priority.values())), - executor2, - listener - ); - } catch (ClusterManagerThrottlingException e) { - throttledTask2.incrementAndGet(); - latch.countDown(); - } - } - })); - } - for (int i = 0; i < numberOfTask3; i++) { - threads.add(new Thread(new Runnable() { - @Override - public void run() { - try { - masterService.submitStateUpdateTask( - task3, - new Task3(), - ClusterStateTaskConfig.build(randomFrom(Priority.values()), new TimeValue(0)), - executor3, - listener - ); - } catch (ClusterManagerThrottlingException e) { - throttledTask3.incrementAndGet(); - latch.countDown(); - } - } - })); - } - for (Thread thread : threads) { - thread.start(); - } - - // await for latch to clear - latch.await(); - assertEquals(numberOfTask1, throttledTask1.get() + succeededTask1.get()); - assertEquals(numberOfTask2, succeededTask2.get()); - assertEquals(0, throttledTask2.get()); - assertEquals(numberOfTask3, throttledTask3.get() + timedOutTask3.get()); - masterService.close(); - } - public void testBlockingCallInClusterStateTaskListenerFails() throws InterruptedException { assumeTrue("assertions must be enabled for this test to work", BaseFuture.class.desiredAssertionStatus()); final CountDownLatch latch = new CountDownLatch(1); diff --git a/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java b/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java index 31018d4cef029..aec2641b355d3 100644 --- a/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java +++ b/server/src/test/java/org/opensearch/cluster/service/TaskBatcherTests.java @@ -71,10 +71,10 @@ public void setUpBatchingTaskExecutor() throws Exception { taskBatcher = new TestTaskBatcher(logger, threadExecutor); } - static class TestTaskBatcher extends TaskBatcher { + class TestTaskBatcher extends TaskBatcher { TestTaskBatcher(Logger logger, PrioritizedOpenSearchThreadPoolExecutor threadExecutor) { - super(logger, threadExecutor, getMockListener()); + super(logger, threadExecutor); } @Override @@ -344,30 +344,6 @@ public void onFailure(String source, Exception e) { latch.await(); } - protected static TaskBatcherListener getMockListener() { - return new TaskBatcherListener() { - @Override - public void onBeginSubmit(List tasks) { - // No Op - } - - @Override - public void onSubmitFailure(List tasks) { - // No Op - } - - @Override - public void onBeginProcessing(List tasks) { - // No Op - } - - @Override - public void onTimeout(List tasks) { - // No Op - } - }; - } - private static class SimpleTask { private final int id; diff --git a/server/src/test/java/org/opensearch/persistent/PersistentTasksClusterServiceTests.java b/server/src/test/java/org/opensearch/persistent/PersistentTasksClusterServiceTests.java index 7659bce456381..7e23e6ef3748c 100644 --- a/server/src/test/java/org/opensearch/persistent/PersistentTasksClusterServiceTests.java +++ b/server/src/test/java/org/opensearch/persistent/PersistentTasksClusterServiceTests.java @@ -201,7 +201,7 @@ public void testReassignmentRequiredOnMetadataChanges() { public void testReassignTasksWithNoTasks() { ClusterState clusterState = initialState(); - assertThat(reassign(createService(), clusterState).metadata().custom(PersistentTasksCustomMetadata.TYPE), nullValue()); + assertThat(reassign(clusterState).metadata().custom(PersistentTasksCustomMetadata.TYPE), nullValue()); } public void testReassignConsidersClusterStateUpdates() { @@ -219,7 +219,7 @@ public void testReassignConsidersClusterStateUpdates() { Metadata.Builder metadata = Metadata.builder(clusterState.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build()); clusterState = builder.metadata(metadata).nodes(nodes).build(); - ClusterState newClusterState = reassign(createService(), clusterState); + ClusterState newClusterState = reassign(clusterState); PersistentTasksCustomMetadata tasksInProgress = newClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); assertThat(tasksInProgress, notNullValue()); @@ -238,8 +238,7 @@ public void testNonClusterStateConditionAssignment() { clusterState = builder.metadata(metadata).nodes(nodes).build(); nonClusterStateCondition = false; - PersistentTasksClusterService service = createService(); - ClusterState newClusterState = reassign(service, clusterState); + ClusterState newClusterState = reassign(clusterState); PersistentTasksCustomMetadata tasksInProgress = newClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); assertThat(tasksInProgress, notNullValue()); @@ -251,7 +250,7 @@ public void testNonClusterStateConditionAssignment() { assertThat(tasksInProgress.tasks().size(), equalTo(1)); nonClusterStateCondition = true; - ClusterState finalClusterState = reassign(service, newClusterState); + ClusterState finalClusterState = reassign(newClusterState); tasksInProgress = finalClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); assertThat(tasksInProgress, notNullValue()); @@ -290,7 +289,7 @@ public void testReassignTasks() { } Metadata.Builder metadata = Metadata.builder(clusterState.metadata()).putCustom(PersistentTasksCustomMetadata.TYPE, tasks.build()); clusterState = builder.metadata(metadata).nodes(nodes).build(); - ClusterState newClusterState = reassign(createService(), clusterState); + ClusterState newClusterState = reassign(clusterState); PersistentTasksCustomMetadata tasksInProgress = newClusterState.getMetadata().custom(PersistentTasksCustomMetadata.TYPE); assertThat(tasksInProgress, notNullValue()); @@ -625,8 +624,8 @@ private void addTestNodes(DiscoveryNodes.Builder nodes, int nonLocalNodesCount) } } - private PersistentTasksClusterService createService() { - return createService((params, currentState) -> { + private ClusterState reassign(ClusterState clusterState) { + PersistentTasksClusterService service = createService((params, currentState) -> { TestParams testParams = (TestParams) params; switch (testParams.getTestParam()) { case "assign_me": @@ -645,9 +644,7 @@ private PersistentTasksClusterService createService() { } return NO_NODE_FOUND; }); - } - private ClusterState reassign(PersistentTasksClusterService service, ClusterState clusterState) { return service.reassignTasks(clusterState); }