From 7139740ace0842a63df4416d0616fc10ae4b8c2b Mon Sep 17 00:00:00 2001 From: rajiv-kv <157019998+rajiv-kv@users.noreply.github.com> Date: Thu, 21 Mar 2024 21:32:48 +0530 Subject: [PATCH] Integrate with CPU admission controller for cluster-manager Read API's. (#12496) (#12829) * Integrate with CPU admission controller for cluster-manager Read API's. The admission control is enforced at the transport layer. Signed-off-by: Rajiv Kumar Vaidyanathan --- CHANGELOG.md | 1 + .../AdmissionForClusterManagerIT.java | 198 ++++++++++++++++++ .../support/HandledTransportAction.java | 37 +++- .../TransportClusterManagerNodeAction.java | 29 ++- ...TransportClusterManagerNodeReadAction.java | 38 +++- .../common/settings/ClusterSettings.java | 2 + .../CpuBasedAdmissionController.java | 5 +- .../IoBasedAdmissionController.java | 5 +- .../enums/AdmissionControlActionType.java | 5 +- .../CpuBasedAdmissionControllerSettings.java | 21 ++ .../IoBasedAdmissionControllerSettings.java | 18 ++ .../transport/TransportService.java | 6 +- ...BasedAdmissionControllerSettingsTests.java | 32 ++- ...BasedAdmissionControllerSettingsTests.java | 16 ++ 14 files changed, 402 insertions(+), 11 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java diff --git a/CHANGELOG.md b/CHANGELOG.md index b04a074a20be6..c368cd2f88562 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introduce a new setting `index.check_pending_flush.enabled` to expose the ability to disable the check for pending flushes by write threads ([#12710](https://github.com/opensearch-project/OpenSearch/pull/12710)) - Built-in secure transports support ([#12435](https://github.com/opensearch-project/OpenSearch/pull/12435)) - Lightweight Transport action to verify local term before fetching cluster-state from remote ([#12252](https://github.com/opensearch-project/OpenSearch/pull/12252/)) +- Integrate with admission controller for cluster-manager Read API. ([#12496](https://github.com/opensearch-project/OpenSearch/pull/12496)) ### Dependencies - Bump `com.google.api.grpc:proto-google-common-protos` from 2.25.1 to 2.37.1 ([#12289](https://github.com/opensearch-project/OpenSearch/pull/12289), [#12365](https://github.com/opensearch-project/OpenSearch/pull/12365)) diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java new file mode 100644 index 0000000000000..4d1964326820e --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java @@ -0,0 +1,198 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.node.IoUsageStats; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; +import org.opensearch.rest.AbstractRestChannel; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.admin.indices.RestGetAliasesAction; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.rest.FakeRestRequest; +import org.junit.Before; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase { + + private static final Logger LOGGER = LogManager.getLogger(AdmissionForClusterManagerIT.class); + + public static final String INDEX_NAME = "test_index"; + + private String clusterManagerNodeId; + private String datanode; + private ResourceUsageCollectorService cMResourceCollector; + + private static final Settings DISABLE_ADMISSION_CONTROL = Settings.builder() + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode()) + .build(); + + private static final Settings ENFORCE_ADMISSION_CONTROL = Settings.builder() + .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50) + .build(); + + @Before + public void init() { + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode( + Settings.builder().put(DISABLE_ADMISSION_CONTROL).build() + ); + datanode = internalCluster().startDataOnlyNode(Settings.builder().put(DISABLE_ADMISSION_CONTROL).build()); + + ensureClusterSizeConsistency(); + ensureGreen(); + + // Disable the automatic resource collection + clusterManagerNodeId = internalCluster().clusterService(clusterManagerNode).localNode().getId(); + cMResourceCollector = internalCluster().getClusterManagerNodeInstance(ResourceUsageCollectorService.class); + cMResourceCollector.stop(); + + // Enable admission control + client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet(); + } + + public void testAdmissionControlEnforced() throws Exception { + cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); + + // Write API on ClusterManager + assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}")); + + // Read API on ClusterManager + GetAliasesRequest aliasesRequest = new GetAliasesRequest(); + aliasesRequest.aliases("alias1"); + try { + dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet(); + fail("expected failure"); + } catch (Exception e) { + assertTrue(e instanceof OpenSearchRejectedExecutionException); + assertTrue(e.getMessage().contains("CPU usage admission controller rejected the request")); + assertTrue(e.getMessage().contains("[indices:admin/aliases/get]")); + assertTrue(e.getMessage().contains("action-type [CLUSTER_ADMIN]")); + } + + client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet(); + GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet(); + assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1)); + + AdmissionControlService admissionControlServiceCM = internalCluster().getClusterManagerNodeInstance(AdmissionControlService.class); + + AdmissionControllerStats admissionStats = getAdmissionControlStats(admissionControlServiceCM).get( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER + ); + + assertEquals(admissionStats.rejectionCount.get(AdmissionControlActionType.CLUSTER_ADMIN.getType()).longValue(), 1); + assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType())); + assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType())); + } + + public void testAdmissionControlEnabledOnNoBreach() throws InterruptedException { + // CPU usage is less than threshold 50% + cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 35, new IoUsageStats(98)); + + // Write API on ClusterManager + assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet()); + + // Read API on ClusterManager + GetAliasesRequest aliasesRequest = new GetAliasesRequest(); + aliasesRequest.aliases("alias1"); + GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet(); + assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1)); + } + + public void testAdmissionControlMonitorOnBreach() throws InterruptedException { + admissionControlDisabledOnBreach( + Settings.builder().put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()).build() + ); + } + + public void testAdmissionControlDisabledOnBreach() throws InterruptedException { + admissionControlDisabledOnBreach(DISABLE_ADMISSION_CONTROL); + } + + public void admissionControlDisabledOnBreach(Settings admission) throws InterruptedException { + client().admin().cluster().prepareUpdateSettings().setTransientSettings(admission).execute().actionGet(); + + cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 97, new IoUsageStats(98)); + + // Write API on ClusterManager + assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet()); + + // Read API on ClusterManager + GetAliasesRequest aliasesRequest = new GetAliasesRequest(); + aliasesRequest.aliases("alias1"); + GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet(); + assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1)); + + } + + public void testAdmissionControlResponseStatus() throws Exception { + cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); + + // Write API on ClusterManager + assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}")); + + // Read API on ClusterManager + FakeRestRequest aliasesRequest = new FakeRestRequest(); + aliasesRequest.params().put("name", "alias1"); + CountDownLatch waitForResponse = new CountDownLatch(1); + AtomicReference aliasResponse = new AtomicReference<>(); + AbstractRestChannel channel = new AbstractRestChannel(aliasesRequest, true) { + + @Override + public void sendResponse(RestResponse response) { + waitForResponse.countDown(); + aliasResponse.set(response); + } + }; + + RestGetAliasesAction restHandler = internalCluster().getInstance(RestGetAliasesAction.class, datanode); + restHandler.handleRequest(aliasesRequest, channel, internalCluster().getInstance(NodeClient.class, datanode)); + + waitForResponse.await(); + assertEquals(RestStatus.TOO_MANY_REQUESTS, aliasResponse.get().status()); + } + + @Override + public void tearDown() throws Exception { + client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet(); + super.tearDown(); + } + + Map getAdmissionControlStats(AdmissionControlService admissionControlService) { + Map acStats = new HashMap<>(); + for (AdmissionControllerStats admissionControllerStats : admissionControlService.stats().getAdmissionControllerStatsList()) { + acStats.put(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats); + } + return acStats; + } +} diff --git a/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java b/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java index 786d8cfb6fa1d..a5054b966b2f9 100644 --- a/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java +++ b/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java @@ -34,6 +34,7 @@ import org.opensearch.action.ActionRequest; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportChannel; @@ -65,7 +66,7 @@ protected HandledTransportAction( Writeable.Reader requestReader, String executor ) { - this(actionName, true, transportService, actionFilters, requestReader, executor); + this(actionName, true, null, transportService, actionFilters, requestReader, executor); } protected HandledTransportAction( @@ -75,19 +76,49 @@ protected HandledTransportAction( ActionFilters actionFilters, Writeable.Reader requestReader ) { - this(actionName, canTripCircuitBreaker, transportService, actionFilters, requestReader, ThreadPool.Names.SAME); + this(actionName, canTripCircuitBreaker, null, transportService, actionFilters, requestReader, ThreadPool.Names.SAME); } protected HandledTransportAction( String actionName, boolean canTripCircuitBreaker, + AdmissionControlActionType admissionControlActionType, + TransportService transportService, + ActionFilters actionFilters, + Writeable.Reader requestReader + ) { + this( + actionName, + canTripCircuitBreaker, + admissionControlActionType, + transportService, + actionFilters, + requestReader, + ThreadPool.Names.SAME + ); + } + + protected HandledTransportAction( + String actionName, + boolean canTripCircuitBreaker, + AdmissionControlActionType admissionControlActionType, TransportService transportService, ActionFilters actionFilters, Writeable.Reader requestReader, String executor ) { super(actionName, actionFilters, transportService.getTaskManager()); - transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader, new TransportHandler()); + + transportService.registerRequestHandler( + actionName, + executor, + false, + canTripCircuitBreaker, + admissionControlActionType, + requestReader, + new TransportHandler() + ); + } /** diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 66ef5283212d5..5f57658e33924 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -64,6 +64,7 @@ import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.discovery.ClusterManagerNotDiscoveredException; import org.opensearch.node.NodeClosedException; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.ConnectTransportException; @@ -105,7 +106,7 @@ protected TransportClusterManagerNodeAction( Writeable.Reader request, IndexNameExpressionResolver indexNameExpressionResolver ) { - this(actionName, true, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver); + this(actionName, true, null, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver); } protected TransportClusterManagerNodeAction( @@ -118,7 +119,31 @@ protected TransportClusterManagerNodeAction( Writeable.Reader request, IndexNameExpressionResolver indexNameExpressionResolver ) { - super(actionName, canTripCircuitBreaker, transportService, actionFilters, request); + this( + actionName, + canTripCircuitBreaker, + null, + transportService, + clusterService, + threadPool, + actionFilters, + request, + indexNameExpressionResolver + ); + } + + protected TransportClusterManagerNodeAction( + String actionName, + boolean canTripCircuitBreaker, + AdmissionControlActionType admissionControlActionType, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + Writeable.Reader request, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super(actionName, canTripCircuitBreaker, admissionControlActionType, transportService, actionFilters, request); this.transportService = transportService; this.clusterService = clusterService; this.threadPool = threadPool; diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java index d8cd5af992028..d58487a475bcf 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java @@ -37,6 +37,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -59,12 +60,46 @@ protected TransportClusterManagerNodeReadAction( Writeable.Reader request, IndexNameExpressionResolver indexNameExpressionResolver ) { - this(actionName, true, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver); + this( + actionName, + true, + AdmissionControlActionType.CLUSTER_ADMIN, + transportService, + clusterService, + threadPool, + actionFilters, + request, + indexNameExpressionResolver + ); + } + + protected TransportClusterManagerNodeReadAction( + String actionName, + boolean checkSizeLimit, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + Writeable.Reader request, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + actionName, + checkSizeLimit, + null, + transportService, + clusterService, + threadPool, + actionFilters, + request, + indexNameExpressionResolver + ); } protected TransportClusterManagerNodeReadAction( String actionName, boolean checkSizeLimit, + AdmissionControlActionType admissionControlActionType, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, @@ -75,6 +110,7 @@ protected TransportClusterManagerNodeReadAction( super( actionName, checkSizeLimit, + admissionControlActionType, transportService, clusterService, threadPool, diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 40e6ea9bec669..828d2f97806f6 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -720,9 +720,11 @@ public void apply(Settings value, Settings current, Settings previous) { CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, + CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT, IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT, IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT + ) ) ); diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java index 5c180346c05e1..7ad0715a2a38e 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java @@ -67,7 +67,8 @@ private void applyForTransportLayer(String actionName, AdmissionControlActionTyp throw new OpenSearchRejectedExecutionException( String.format( Locale.ROOT, - "CPU usage admission controller rejected the request for action [%s] as CPU limit reached", + "CPU usage admission controller rejected the request for action [%s] as CPU limit reached for action-type [%s]", + actionName, admissionControlActionType.name() ) ); @@ -112,6 +113,8 @@ private long getCpuRejectionThreshold(AdmissionControlActionType admissionContro return this.settings.getSearchCPULimit(); case INDEXING: return this.settings.getIndexingCPULimit(); + case CLUSTER_ADMIN: + return this.settings.getClusterAdminCPULimit(); default: throw new IllegalArgumentException( String.format( diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java index ad6cc3ff378f0..d03b2050cd5f3 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java @@ -68,7 +68,8 @@ private void applyForTransportLayer(String actionName, AdmissionControlActionTyp throw new OpenSearchRejectedExecutionException( String.format( Locale.ROOT, - "Io usage admission controller rejected the request for action [%s] as IO limit reached", + "IO usage admission controller rejected the request for action [%s] as IO limit reached for action-type [%s]", + actionName, admissionControlActionType.name() ) ); @@ -113,6 +114,8 @@ private long getIoRejectionThreshold(AdmissionControlActionType admissionControl return this.settings.getSearchIOUsageLimit(); case INDEXING: return this.settings.getIndexingIOUsageLimit(); + case CLUSTER_ADMIN: + return this.settings.getClusterAdminIOUsageLimit(); default: throw new IllegalArgumentException( String.format( diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java index 8cf6e973ceb64..6acc440180281 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java @@ -15,7 +15,8 @@ */ public enum AdmissionControlActionType { INDEXING("indexing"), - SEARCH("search"); + SEARCH("search"), + CLUSTER_ADMIN("cluster_admin"); private final String type; @@ -38,6 +39,8 @@ public static AdmissionControlActionType fromName(String name) { return INDEXING; case "search": return SEARCH; + case "cluster_admin": + return CLUSTER_ADMIN; default: throw new IllegalArgumentException("Not Supported TransportAction Type: " + name); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java index 1bddd1446a4c4..30012176d59af 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java @@ -30,6 +30,8 @@ public static class Defaults { private AdmissionControlMode transportLayerMode; private Long searchCPULimit; private Long indexingCPULimit; + private Long clusterInfoCPULimit; + /** * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set * rejection will be performed, otherwise only rejection metrics will be populated. @@ -62,14 +64,24 @@ public static class Defaults { Setting.Property.NodeScope ); + public static final Setting CLUSTER_ADMIN_CPU_USAGE_LIMIT = Setting.longSetting( + "admission_control.cluster.admin.cpu_usage.limit", + Defaults.CPU_USAGE_LIMIT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + // currently limited to one setting will add further more settings in follow-up PR's public CpuBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) { this.transportLayerMode = CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); clusterSettings.addSettingsUpdateConsumer(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); this.searchCPULimit = SEARCH_CPU_USAGE_LIMIT.get(settings); this.indexingCPULimit = INDEXING_CPU_USAGE_LIMIT.get(settings); + this.clusterInfoCPULimit = CLUSTER_ADMIN_CPU_USAGE_LIMIT.get(settings); clusterSettings.addSettingsUpdateConsumer(INDEXING_CPU_USAGE_LIMIT, this::setIndexingCPULimit); clusterSettings.addSettingsUpdateConsumer(SEARCH_CPU_USAGE_LIMIT, this::setSearchCPULimit); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ADMIN_CPU_USAGE_LIMIT, this::setClusterInfoCPULimit); + } private void setTransportLayerMode(AdmissionControlMode admissionControlMode) { @@ -88,6 +100,10 @@ public Long getIndexingCPULimit() { return indexingCPULimit; } + public Long getClusterAdminCPULimit() { + return clusterInfoCPULimit; + } + public void setIndexingCPULimit(Long indexingCPULimit) { this.indexingCPULimit = indexingCPULimit; } @@ -95,4 +111,9 @@ public void setIndexingCPULimit(Long indexingCPULimit) { public void setSearchCPULimit(Long searchCPULimit) { this.searchCPULimit = searchCPULimit; } + + public void setClusterInfoCPULimit(Long clusterInfoCPULimit) { + this.clusterInfoCPULimit = clusterInfoCPULimit; + } + } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java index e58ed28d21605..e442906ea77d7 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java @@ -25,11 +25,14 @@ public class IoBasedAdmissionControllerSettings { */ public static class Defaults { public static final long IO_USAGE_LIMIT = 95; + public static final long CLUSTER_ADMIN_IO_USAGE_LIMIT = 100; + } private AdmissionControlMode transportLayerMode; private Long searchIOUsageLimit; private Long indexingIOUsageLimit; + private Long clusterAdminIOUsageLimit; /** * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set @@ -63,11 +66,22 @@ public static class Defaults { Setting.Property.NodeScope ); + /** + * This setting used to set the limits for cluster admin requests by default it will use default cluster_admin IO usage limit + */ + public static final Setting CLUSTER_ADMIN_IO_USAGE_LIMIT = Setting.longSetting( + "admission_control.cluster_admin.io_usage.limit", + Defaults.CLUSTER_ADMIN_IO_USAGE_LIMIT, + Setting.Property.Final, + Setting.Property.NodeScope + ); + public IoBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) { this.transportLayerMode = IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); clusterSettings.addSettingsUpdateConsumer(IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); this.searchIOUsageLimit = SEARCH_IO_USAGE_LIMIT.get(settings); this.indexingIOUsageLimit = INDEXING_IO_USAGE_LIMIT.get(settings); + this.clusterAdminIOUsageLimit = CLUSTER_ADMIN_IO_USAGE_LIMIT.get(settings); clusterSettings.addSettingsUpdateConsumer(INDEXING_IO_USAGE_LIMIT, this::setIndexingIOUsageLimit); clusterSettings.addSettingsUpdateConsumer(SEARCH_IO_USAGE_LIMIT, this::setSearchIOUsageLimit); } @@ -95,4 +109,8 @@ public Long getIndexingIOUsageLimit() { public Long getSearchIOUsageLimit() { return searchIOUsageLimit; } + + public Long getClusterAdminIOUsageLimit() { + return clusterAdminIOUsageLimit; + } } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index c04d34856814a..59130ec637a52 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -1236,7 +1236,11 @@ public void registerRequestHandler( TransportRequestHandler handler ) { validateActionName(action); - handler = interceptor.interceptHandler(action, executor, forceExecution, handler, admissionControlActionType); + if (admissionControlActionType != null) { + handler = interceptor.interceptHandler(action, executor, forceExecution, handler, admissionControlActionType); + } else { + handler = interceptor.interceptHandler(action, executor, forceExecution, handler); + } RequestHandlerRegistry reg = new RequestHandlerRegistry<>( action, requestReader, diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java index 9ce28bc7fdb40..6836ecb3d615f 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java @@ -49,7 +49,8 @@ public void testSettingsExists() { Arrays.asList( CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, - CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT + CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, + CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT ) ) ); @@ -149,4 +150,33 @@ public void testUpdateAfterGetConfiguredSettings() { assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), searchPercent); assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), indexingPercent); } + + public void testConfiguredSettingsForAdmin() { + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .put(CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50) + .build(); + + CpuBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CpuBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + settings + ); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertEquals(cpuBasedAdmissionControllerSettings.getClusterAdminCPULimit().longValue(), 50); + + Settings updatedSettings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .put(CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 90) + .build(); + clusterService.getClusterSettings().applySettings(updatedSettings); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.MONITOR); + assertEquals(cpuBasedAdmissionControllerSettings.getClusterAdminCPULimit().longValue(), 90); + + } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java index ff777c175ec0e..c462f9700264d 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java @@ -72,6 +72,10 @@ public void testDefaultSettings() { assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), percent); assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), percent); + assertEquals( + ioBasedAdmissionControllerSettings.getClusterAdminIOUsageLimit().longValue(), + IoBasedAdmissionControllerSettings.Defaults.CLUSTER_ADMIN_IO_USAGE_LIMIT + ); } public void testGetConfiguredSettings() { @@ -134,6 +138,10 @@ public void testUpdateAfterGetConfiguredSettings() { assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent); assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), percent); + assertEquals( + ioBasedAdmissionControllerSettings.getClusterAdminIOUsageLimit().longValue(), + IoBasedAdmissionControllerSettings.Defaults.CLUSTER_ADMIN_IO_USAGE_LIMIT + ); Settings updatedSettings = Settings.builder() .put( @@ -146,6 +154,10 @@ public void testUpdateAfterGetConfiguredSettings() { assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.MONITOR); assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent); assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), indexingPercent); + assertEquals( + ioBasedAdmissionControllerSettings.getClusterAdminIOUsageLimit().longValue(), + IoBasedAdmissionControllerSettings.Defaults.CLUSTER_ADMIN_IO_USAGE_LIMIT + ); searchPercent = 70; updatedSettings = Settings.builder() @@ -156,5 +168,9 @@ public void testUpdateAfterGetConfiguredSettings() { clusterService.getClusterSettings().applySettings(updatedSettings); assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent); assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), indexingPercent); + assertEquals( + ioBasedAdmissionControllerSettings.getClusterAdminIOUsageLimit().longValue(), + IoBasedAdmissionControllerSettings.Defaults.CLUSTER_ADMIN_IO_USAGE_LIMIT + ); } }