From 8fc88709f78d2e7dcf5531d6cf79577028f67fdb Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Wed, 28 Feb 2024 18:36:57 +0530 Subject: [PATCH 1/5] Integrate with CPU admission controller for cluster-manager Read API's. The admission control is enforced at the transport layer. Signed-off-by: Rajiv Kumar Vaidyanathan --- .../AdmissionForClusterManagerIT.java | 135 ++++++++++++++++++ .../support/HandledTransportAction.java | 47 +++++- .../TransportClusterManagerNodeAction.java | 29 +++- ...TransportClusterManagerNodeReadAction.java | 38 ++++- .../common/settings/ClusterSettings.java | 5 +- .../CpuBasedAdmissionController.java | 5 +- .../enums/AdmissionControlActionType.java | 5 +- .../CpuBasedAdmissionControllerSettings.java | 21 +++ 8 files changed, 276 insertions(+), 9 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java new file mode 100644 index 0000000000000..a721cec1b9ca4 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java @@ -0,0 +1,135 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest; +import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.Before; + +import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.CLUSTER_INFO_CPU_USAGE_LIMIT; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase { + + private static final Logger LOGGER = LogManager.getLogger(AdmissionForClusterManagerIT.class); + + public static final String INDEX_NAME = "test_index"; + + private String clusterManagerNodeId; + private ResourceUsageCollectorService cMResourceCollector; + + private static final Settings DISABLE_ADMISSION_CONTROL = Settings.builder() + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED) + .build(); + + private static final Settings ENFORCE_ADMISSION_CONTROL = Settings.builder() + .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(CLUSTER_INFO_CPU_USAGE_LIMIT.getKey(), 50) + .build(); + + @Before + public void init() { + String clusterManagerNode = internalCluster().startClusterManagerOnlyNode( + Settings.builder().put(DISABLE_ADMISSION_CONTROL).build() + ); + internalCluster().startDataOnlyNode(Settings.builder().put(DISABLE_ADMISSION_CONTROL).build()); + + ensureClusterSizeConsistency(); + ensureGreen(); + + // Disable the automatic resource collection + clusterManagerNodeId = internalCluster().clusterService(clusterManagerNode).localNode().getId(); + cMResourceCollector = internalCluster().getClusterManagerNodeInstance(ResourceUsageCollectorService.class); + cMResourceCollector.stop(); + + // Enable admission control + client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet(); + } + + public void testAdmissionControlEnforced() throws InterruptedException { + cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99); + + // Write API on ClusterManager + assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}")); + + // Read API on ClusterManager + GetAliasesRequest aliasesRequest = new GetAliasesRequest(); + aliasesRequest.aliases("alias1"); + try { + dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet(); + fail("expected failure"); + } catch (Exception e) { + assertTrue(e instanceof OpenSearchRejectedExecutionException); + } + + client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet(); + + GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet(); + assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1)); + + AdmissionControlService admissionControlServicePrimary = internalCluster().getClusterManagerNodeInstance( + AdmissionControlService.class + ); + AdmissionControllerStats admissionStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); + assertEquals(admissionStats.rejectionCount.get(AdmissionControlActionType.CLUSTER_INFO.getType()).longValue(), 1); + assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType())); + assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType())); + } + + public void testAdmissionControlEnabledOnNoBreach() throws InterruptedException { + // CPU usage is less than threshold 50% + cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 35); + + // Write API on ClusterManager + assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet()); + + // Read API on ClusterManager + GetAliasesRequest aliasesRequest = new GetAliasesRequest(); + aliasesRequest.aliases("alias1"); + GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet(); + assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1)); + } + + public void testAdmissionControlDisabledOnBreach() throws InterruptedException { + client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet(); + + cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 97); + + // Write API on ClusterManager + assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet()); + + // Read API on ClusterManager + GetAliasesRequest aliasesRequest = new GetAliasesRequest(); + aliasesRequest.aliases("alias1"); + GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet(); + assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1)); + + } + + @Override + public void tearDown() throws Exception { + client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet(); + super.tearDown(); + } +} diff --git a/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java b/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java index 786d8cfb6fa1d..da419221378ee 100644 --- a/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java +++ b/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java @@ -34,6 +34,7 @@ import org.opensearch.action.ActionRequest; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportChannel; @@ -65,7 +66,7 @@ protected HandledTransportAction( Writeable.Reader requestReader, String executor ) { - this(actionName, true, transportService, actionFilters, requestReader, executor); + this(actionName, true, null, transportService, actionFilters, requestReader, executor); } protected HandledTransportAction( @@ -75,19 +76,59 @@ protected HandledTransportAction( ActionFilters actionFilters, Writeable.Reader requestReader ) { - this(actionName, canTripCircuitBreaker, transportService, actionFilters, requestReader, ThreadPool.Names.SAME); + this(actionName, canTripCircuitBreaker, null, transportService, actionFilters, requestReader, ThreadPool.Names.SAME); } protected HandledTransportAction( String actionName, boolean canTripCircuitBreaker, + AdmissionControlActionType admissionControlActionType, + TransportService transportService, + ActionFilters actionFilters, + Writeable.Reader requestReader + ) { + this( + actionName, + canTripCircuitBreaker, + admissionControlActionType, + transportService, + actionFilters, + requestReader, + ThreadPool.Names.SAME + ); + } + + protected HandledTransportAction( + String actionName, + boolean canTripCircuitBreaker, + AdmissionControlActionType admissionControlActionType, TransportService transportService, ActionFilters actionFilters, Writeable.Reader requestReader, String executor ) { super(actionName, actionFilters, transportService.getTaskManager()); - transportService.registerRequestHandler(actionName, executor, false, canTripCircuitBreaker, requestReader, new TransportHandler()); + + if (admissionControlActionType != null) { + transportService.registerRequestHandler( + actionName, + executor, + false, + canTripCircuitBreaker, + admissionControlActionType, + requestReader, + new TransportHandler() + ); + } else { + transportService.registerRequestHandler( + actionName, + executor, + false, + canTripCircuitBreaker, + requestReader, + new TransportHandler() + ); + } } /** diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java index 6a081f9dfecde..12d572b30272a 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeAction.java @@ -64,6 +64,7 @@ import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.discovery.ClusterManagerNotDiscoveredException; import org.opensearch.node.NodeClosedException; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.ConnectTransportException; @@ -105,7 +106,7 @@ protected TransportClusterManagerNodeAction( Writeable.Reader request, IndexNameExpressionResolver indexNameExpressionResolver ) { - this(actionName, true, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver); + this(actionName, true, null, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver); } protected TransportClusterManagerNodeAction( @@ -118,7 +119,31 @@ protected TransportClusterManagerNodeAction( Writeable.Reader request, IndexNameExpressionResolver indexNameExpressionResolver ) { - super(actionName, canTripCircuitBreaker, transportService, actionFilters, request); + this( + actionName, + canTripCircuitBreaker, + null, + transportService, + clusterService, + threadPool, + actionFilters, + request, + indexNameExpressionResolver + ); + } + + protected TransportClusterManagerNodeAction( + String actionName, + boolean canTripCircuitBreaker, + AdmissionControlActionType admissionControlActionType, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + Writeable.Reader request, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super(actionName, canTripCircuitBreaker, admissionControlActionType, transportService, actionFilters, request); this.transportService = transportService; this.clusterService = clusterService; this.threadPool = threadPool; diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java index d8cd5af992028..43f030c5ae902 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java @@ -37,6 +37,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.core.action.ActionResponse; import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -59,12 +60,46 @@ protected TransportClusterManagerNodeReadAction( Writeable.Reader request, IndexNameExpressionResolver indexNameExpressionResolver ) { - this(actionName, true, transportService, clusterService, threadPool, actionFilters, request, indexNameExpressionResolver); + this( + actionName, + true, + AdmissionControlActionType.CLUSTER_INFO, + transportService, + clusterService, + threadPool, + actionFilters, + request, + indexNameExpressionResolver + ); + } + + protected TransportClusterManagerNodeReadAction( + String actionName, + boolean checkSizeLimit, + TransportService transportService, + ClusterService clusterService, + ThreadPool threadPool, + ActionFilters actionFilters, + Writeable.Reader request, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + actionName, + checkSizeLimit, + null, + transportService, + clusterService, + threadPool, + actionFilters, + request, + indexNameExpressionResolver + ); } protected TransportClusterManagerNodeReadAction( String actionName, boolean checkSizeLimit, + AdmissionControlActionType admissionControlActionType, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, @@ -75,6 +110,7 @@ protected TransportClusterManagerNodeReadAction( super( actionName, checkSizeLimit, + admissionControlActionType, transportService, clusterService, threadPool, diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 7c8afb6b5c1b5..a16f75eec9552 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -708,16 +708,19 @@ public void apply(Settings value, Settings current, Settings previous) { RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING, IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, + IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, + AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, + CpuBasedAdmissionControllerSettings.CLUSTER_INFO_CPU_USAGE_LIMIT, + IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT, IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT, IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, - // Concurrent segment search settings SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java index 5c180346c05e1..6c8cf70c408eb 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java @@ -67,7 +67,8 @@ private void applyForTransportLayer(String actionName, AdmissionControlActionTyp throw new OpenSearchRejectedExecutionException( String.format( Locale.ROOT, - "CPU usage admission controller rejected the request for action [%s] as CPU limit reached", + "CPU usage admission controller rejected the request for action [%s] as CPU limit reached for action-type [%s]", + actionName, admissionControlActionType.name() ) ); @@ -112,6 +113,8 @@ private long getCpuRejectionThreshold(AdmissionControlActionType admissionContro return this.settings.getSearchCPULimit(); case INDEXING: return this.settings.getIndexingCPULimit(); + case CLUSTER_INFO: + return this.settings.getClusterInfoCPULimit(); default: throw new IllegalArgumentException( String.format( diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java index 8cf6e973ceb64..67ae2cfb113b7 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java @@ -15,7 +15,8 @@ */ public enum AdmissionControlActionType { INDEXING("indexing"), - SEARCH("search"); + SEARCH("search"), + CLUSTER_INFO("cluster_info"); private final String type; @@ -38,6 +39,8 @@ public static AdmissionControlActionType fromName(String name) { return INDEXING; case "search": return SEARCH; + case "cluster_info": + return CLUSTER_INFO; default: throw new IllegalArgumentException("Not Supported TransportAction Type: " + name); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java index 1bddd1446a4c4..f09ea3ca06537 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java @@ -30,6 +30,8 @@ public static class Defaults { private AdmissionControlMode transportLayerMode; private Long searchCPULimit; private Long indexingCPULimit; + private Long clusterInfoCPULimit; + /** * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set * rejection will be performed, otherwise only rejection metrics will be populated. @@ -62,14 +64,24 @@ public static class Defaults { Setting.Property.NodeScope ); + public static final Setting CLUSTER_INFO_CPU_USAGE_LIMIT = Setting.longSetting( + "admission_control.cluster.admin.cpu_usage.limit", + Defaults.CPU_USAGE_LIMIT, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + // currently limited to one setting will add further more settings in follow-up PR's public CpuBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) { this.transportLayerMode = CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); clusterSettings.addSettingsUpdateConsumer(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); this.searchCPULimit = SEARCH_CPU_USAGE_LIMIT.get(settings); this.indexingCPULimit = INDEXING_CPU_USAGE_LIMIT.get(settings); + this.clusterInfoCPULimit = CLUSTER_INFO_CPU_USAGE_LIMIT.get(settings); clusterSettings.addSettingsUpdateConsumer(INDEXING_CPU_USAGE_LIMIT, this::setIndexingCPULimit); clusterSettings.addSettingsUpdateConsumer(SEARCH_CPU_USAGE_LIMIT, this::setSearchCPULimit); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_INFO_CPU_USAGE_LIMIT, this::setClusterInfoCPULimit); + } private void setTransportLayerMode(AdmissionControlMode admissionControlMode) { @@ -88,6 +100,10 @@ public Long getIndexingCPULimit() { return indexingCPULimit; } + public Long getClusterInfoCPULimit() { + return clusterInfoCPULimit; + } + public void setIndexingCPULimit(Long indexingCPULimit) { this.indexingCPULimit = indexingCPULimit; } @@ -95,4 +111,9 @@ public void setIndexingCPULimit(Long indexingCPULimit) { public void setSearchCPULimit(Long searchCPULimit) { this.searchCPULimit = searchCPULimit; } + + public void setClusterInfoCPULimit(Long clusterInfoCPULimit) { + this.clusterInfoCPULimit = clusterInfoCPULimit; + } + } From a349fd80493dd0eecc4c71e0172c1ef64b956e73 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Wed, 13 Mar 2024 14:49:08 +0530 Subject: [PATCH 2/5] fixed the PR comments and added Settings test Signed-off-by: Rajiv Kumar Vaidyanathan --- .../AdmissionForClusterManagerIT.java | 6 ++-- .../support/HandledTransportAction.java | 30 ++++++----------- ...TransportClusterManagerNodeReadAction.java | 2 +- .../common/settings/ClusterSettings.java | 7 ++++ .../CpuBasedAdmissionController.java | 4 +-- .../enums/AdmissionControlActionType.java | 17 ++++------ .../CpuBasedAdmissionControllerSettings.java | 8 ++--- .../transport/TransportService.java | 6 +++- ...BasedAdmissionControllerSettingsTests.java | 32 ++++++++++++++++++- 9 files changed, 70 insertions(+), 42 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java index a721cec1b9ca4..11cce10d93e69 100644 --- a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java @@ -24,7 +24,7 @@ import org.junit.Before; import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; -import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.CLUSTER_INFO_CPU_USAGE_LIMIT; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; @@ -45,7 +45,7 @@ public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase { private static final Settings ENFORCE_ADMISSION_CONTROL = Settings.builder() .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) - .put(CLUSTER_INFO_CPU_USAGE_LIMIT.getKey(), 50) + .put(CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50) .build(); @Before @@ -92,7 +92,7 @@ public void testAdmissionControlEnforced() throws InterruptedException { AdmissionControlService.class ); AdmissionControllerStats admissionStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); - assertEquals(admissionStats.rejectionCount.get(AdmissionControlActionType.CLUSTER_INFO.getType()).longValue(), 1); + assertEquals(admissionStats.rejectionCount.get(AdmissionControlActionType.CLUSTER_ADMIN.getType()).longValue(), 1); assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType())); assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType())); } diff --git a/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java b/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java index da419221378ee..a5054b966b2f9 100644 --- a/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java +++ b/server/src/main/java/org/opensearch/action/support/HandledTransportAction.java @@ -109,26 +109,16 @@ protected HandledTransportAction( ) { super(actionName, actionFilters, transportService.getTaskManager()); - if (admissionControlActionType != null) { - transportService.registerRequestHandler( - actionName, - executor, - false, - canTripCircuitBreaker, - admissionControlActionType, - requestReader, - new TransportHandler() - ); - } else { - transportService.registerRequestHandler( - actionName, - executor, - false, - canTripCircuitBreaker, - requestReader, - new TransportHandler() - ); - } + transportService.registerRequestHandler( + actionName, + executor, + false, + canTripCircuitBreaker, + admissionControlActionType, + requestReader, + new TransportHandler() + ); + } /** diff --git a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java index 43f030c5ae902..d58487a475bcf 100644 --- a/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java +++ b/server/src/main/java/org/opensearch/action/support/clustermanager/TransportClusterManagerNodeReadAction.java @@ -63,7 +63,7 @@ protected TransportClusterManagerNodeReadAction( this( actionName, true, - AdmissionControlActionType.CLUSTER_INFO, + AdmissionControlActionType.CLUSTER_ADMIN, transportService, clusterService, threadPool, diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index a16f75eec9552..fdec5918e70c0 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -710,6 +710,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, + // Admission Control Settings AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, @@ -719,8 +720,14 @@ public void apply(Settings value, Settings current, Settings previous) { IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT, IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT, +<<<<<<< HEAD IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, +======= + CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT, + IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, + +>>>>>>> 28c03ead970 (fixed the PR comments and added Settings test) // Concurrent segment search settings SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java index 6c8cf70c408eb..7ad0715a2a38e 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CpuBasedAdmissionController.java @@ -113,8 +113,8 @@ private long getCpuRejectionThreshold(AdmissionControlActionType admissionContro return this.settings.getSearchCPULimit(); case INDEXING: return this.settings.getIndexingCPULimit(); - case CLUSTER_INFO: - return this.settings.getClusterInfoCPULimit(); + case CLUSTER_ADMIN: + return this.settings.getClusterAdminCPULimit(); default: throw new IllegalArgumentException( String.format( diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java index 67ae2cfb113b7..168372c4724c9 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java @@ -16,7 +16,7 @@ public enum AdmissionControlActionType { INDEXING("indexing"), SEARCH("search"), - CLUSTER_INFO("cluster_info"); + CLUSTER_ADMIN("cluster_admin"); private final String type; @@ -34,15 +34,12 @@ public String getType() { public static AdmissionControlActionType fromName(String name) { name = name.toLowerCase(Locale.ROOT); - switch (name) { - case "indexing": - return INDEXING; - case "search": - return SEARCH; - case "cluster_info": - return CLUSTER_INFO; - default: - throw new IllegalArgumentException("Not Supported TransportAction Type: " + name); + + for (AdmissionControlActionType type : AdmissionControlActionType.values()) { + if (type.getType().equals(name)) { + return type; + } } + throw new IllegalArgumentException("Not Supported TransportAction Type: " + name); } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java index f09ea3ca06537..30012176d59af 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CpuBasedAdmissionControllerSettings.java @@ -64,7 +64,7 @@ public static class Defaults { Setting.Property.NodeScope ); - public static final Setting CLUSTER_INFO_CPU_USAGE_LIMIT = Setting.longSetting( + public static final Setting CLUSTER_ADMIN_CPU_USAGE_LIMIT = Setting.longSetting( "admission_control.cluster.admin.cpu_usage.limit", Defaults.CPU_USAGE_LIMIT, Setting.Property.Dynamic, @@ -77,10 +77,10 @@ public CpuBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Sett clusterSettings.addSettingsUpdateConsumer(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); this.searchCPULimit = SEARCH_CPU_USAGE_LIMIT.get(settings); this.indexingCPULimit = INDEXING_CPU_USAGE_LIMIT.get(settings); - this.clusterInfoCPULimit = CLUSTER_INFO_CPU_USAGE_LIMIT.get(settings); + this.clusterInfoCPULimit = CLUSTER_ADMIN_CPU_USAGE_LIMIT.get(settings); clusterSettings.addSettingsUpdateConsumer(INDEXING_CPU_USAGE_LIMIT, this::setIndexingCPULimit); clusterSettings.addSettingsUpdateConsumer(SEARCH_CPU_USAGE_LIMIT, this::setSearchCPULimit); - clusterSettings.addSettingsUpdateConsumer(CLUSTER_INFO_CPU_USAGE_LIMIT, this::setClusterInfoCPULimit); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ADMIN_CPU_USAGE_LIMIT, this::setClusterInfoCPULimit); } @@ -100,7 +100,7 @@ public Long getIndexingCPULimit() { return indexingCPULimit; } - public Long getClusterInfoCPULimit() { + public Long getClusterAdminCPULimit() { return clusterInfoCPULimit; } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 652d57f4c5348..d08b28730d417 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -1214,7 +1214,11 @@ public void registerRequestHandler( TransportRequestHandler handler ) { validateActionName(action); - handler = interceptor.interceptHandler(action, executor, forceExecution, handler, admissionControlActionType); + if (admissionControlActionType != null) { + handler = interceptor.interceptHandler(action, executor, forceExecution, handler, admissionControlActionType); + } else { + handler = interceptor.interceptHandler(action, executor, forceExecution, handler); + } RequestHandlerRegistry reg = new RequestHandlerRegistry<>( action, requestReader, diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java index 9ce28bc7fdb40..6836ecb3d615f 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java @@ -49,7 +49,8 @@ public void testSettingsExists() { Arrays.asList( CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, - CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT + CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, + CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT ) ) ); @@ -149,4 +150,33 @@ public void testUpdateAfterGetConfiguredSettings() { assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), searchPercent); assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), indexingPercent); } + + public void testConfiguredSettingsForAdmin() { + Settings settings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .put(CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 50) + .build(); + + CpuBasedAdmissionControllerSettings cpuBasedAdmissionControllerSettings = new CpuBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + settings + ); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertEquals(cpuBasedAdmissionControllerSettings.getClusterAdminCPULimit().longValue(), 50); + + Settings updatedSettings = Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .put(CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT.getKey(), 90) + .build(); + clusterService.getClusterSettings().applySettings(updatedSettings); + assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.MONITOR); + assertEquals(cpuBasedAdmissionControllerSettings.getClusterAdminCPULimit().longValue(), 90); + + } } From f7514611b60bd327c2a740f1a1a745e147643300 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Wed, 13 Mar 2024 14:49:08 +0530 Subject: [PATCH 3/5] fixed the PR comments and added Settings test Signed-off-by: Rajiv Kumar Vaidyanathan --- .../admissioncontrol/AdmissionForClusterManagerIT.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java index 11cce10d93e69..fa03a89eea8d8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java @@ -15,6 +15,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.node.IoUsageStats; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.node.resource.tracker.ResourceTrackerSettings; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; @@ -68,7 +69,7 @@ public void init() { } public void testAdmissionControlEnforced() throws InterruptedException { - cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99); + cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); // Write API on ClusterManager assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}")); @@ -99,7 +100,7 @@ public void testAdmissionControlEnforced() throws InterruptedException { public void testAdmissionControlEnabledOnNoBreach() throws InterruptedException { // CPU usage is less than threshold 50% - cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 35); + cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 35, new IoUsageStats(98)); // Write API on ClusterManager assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet()); @@ -114,7 +115,7 @@ public void testAdmissionControlEnabledOnNoBreach() throws InterruptedException public void testAdmissionControlDisabledOnBreach() throws InterruptedException { client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet(); - cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 97); + cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 97, new IoUsageStats(98)); // Write API on ClusterManager assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}").execute().actionGet()); From e3b757041d2d7bc1fb20e5d3fa928543769044af Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Tue, 19 Mar 2024 00:26:41 +0530 Subject: [PATCH 4/5] testcase to verify 429 response Signed-off-by: Rajiv Kumar Vaidyanathan --- .../AdmissionForClusterManagerIT.java | 42 +++++++++++++++++-- .../common/settings/ClusterSettings.java | 5 +-- 2 files changed, 40 insertions(+), 7 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java index fa03a89eea8d8..e4c8086542789 100644 --- a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java @@ -12,18 +12,27 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest; import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse; +import org.opensearch.client.node.NodeClient; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.core.rest.RestStatus; import org.opensearch.node.IoUsageStats; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.node.resource.tracker.ResourceTrackerSettings; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; +import org.opensearch.rest.AbstractRestChannel; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.admin.indices.RestGetAliasesAction; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.test.rest.FakeRestRequest; import org.junit.Before; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -37,6 +46,7 @@ public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase { public static final String INDEX_NAME = "test_index"; private String clusterManagerNodeId; + private String datanode; private ResourceUsageCollectorService cMResourceCollector; private static final Settings DISABLE_ADMISSION_CONTROL = Settings.builder() @@ -54,7 +64,7 @@ public void init() { String clusterManagerNode = internalCluster().startClusterManagerOnlyNode( Settings.builder().put(DISABLE_ADMISSION_CONTROL).build() ); - internalCluster().startDataOnlyNode(Settings.builder().put(DISABLE_ADMISSION_CONTROL).build()); + datanode = internalCluster().startDataOnlyNode(Settings.builder().put(DISABLE_ADMISSION_CONTROL).build()); ensureClusterSizeConsistency(); ensureGreen(); @@ -68,7 +78,7 @@ public void init() { client().admin().cluster().prepareUpdateSettings().setTransientSettings(ENFORCE_ADMISSION_CONTROL).execute().actionGet(); } - public void testAdmissionControlEnforced() throws InterruptedException { + public void testAdmissionControlEnforced() throws Exception { cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); // Write API on ClusterManager @@ -85,7 +95,6 @@ public void testAdmissionControlEnforced() throws InterruptedException { } client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet(); - GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet(); assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1)); @@ -128,6 +137,33 @@ public void testAdmissionControlDisabledOnBreach() throws InterruptedException { } + public void testAdmissionControlResponseStatus() throws Exception { + cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); + + // Write API on ClusterManager + assertAcked(prepareCreate("test").setMapping("field", "type=text").setAliases("{\"alias1\" : {}}")); + + // Read API on ClusterManager + FakeRestRequest aliasesRequest = new FakeRestRequest(); + aliasesRequest.params().put("name", "alias1"); + CountDownLatch waitForResponse = new CountDownLatch(1); + AtomicReference aliasResponse = new AtomicReference<>(); + AbstractRestChannel channel = new AbstractRestChannel(aliasesRequest, true) { + + @Override + public void sendResponse(RestResponse response) { + waitForResponse.countDown(); + aliasResponse.set(response); + } + }; + + RestGetAliasesAction restHandler = internalCluster().getInstance(RestGetAliasesAction.class, datanode); + restHandler.handleRequest(aliasesRequest, channel, internalCluster().getInstance(NodeClient.class, datanode)); + + waitForResponse.await(); + assertEquals(RestStatus.TOO_MANY_REQUESTS, aliasResponse.get().status()); + } + @Override public void tearDown() throws Exception { client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet(); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index fdec5918e70c0..89f3769f09b6b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -716,18 +716,15 @@ public void apply(Settings value, Settings current, Settings previous) { CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, CpuBasedAdmissionControllerSettings.CLUSTER_INFO_CPU_USAGE_LIMIT, + CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT, IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT, IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT, -<<<<<<< HEAD IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, -======= - CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT, IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, ->>>>>>> 28c03ead970 (fixed the PR comments and added Settings test) // Concurrent segment search settings SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING From d9e42b55a67b7d6d3a3f87f93f565c33c1b20bc2 Mon Sep 17 00:00:00 2001 From: Rajiv Kumar Vaidyanathan Date: Tue, 19 Mar 2024 23:08:26 +0530 Subject: [PATCH 5/5] updated changelog Signed-off-by: Rajiv Kumar Vaidyanathan --- CHANGELOG.md | 1 + .../AdmissionForClusterManagerIT.java | 36 ++++++++++++++++--- .../common/settings/ClusterSettings.java | 6 +--- .../IoBasedAdmissionController.java | 5 ++- .../enums/AdmissionControlActionType.java | 15 ++++---- .../IoBasedAdmissionControllerSettings.java | 18 ++++++++++ ...BasedAdmissionControllerSettingsTests.java | 16 +++++++++ 7 files changed, 80 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c65b20b99bc25..9dced730cbcad 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -121,6 +121,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introduce a new setting `index.check_pending_flush.enabled` to expose the ability to disable the check for pending flushes by write threads ([#12710](https://github.com/opensearch-project/OpenSearch/pull/12710)) - Built-in secure transports support ([#12435](https://github.com/opensearch-project/OpenSearch/pull/12435)) - Lightweight Transport action to verify local term before fetching cluster-state from remote ([#12252](https://github.com/opensearch-project/OpenSearch/pull/12252/)) +- Integrate with admission controller for cluster-manager Read API. ([#12496](https://github.com/opensearch-project/OpenSearch/pull/12496)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java index e4c8086542789..4d1964326820e 100644 --- a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionForClusterManagerIT.java @@ -20,6 +20,7 @@ import org.opensearch.node.IoUsageStats; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; @@ -30,6 +31,8 @@ import org.opensearch.test.rest.FakeRestRequest; import org.junit.Before; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -50,7 +53,7 @@ public class AdmissionForClusterManagerIT extends OpenSearchIntegTestCase { private ResourceUsageCollectorService cMResourceCollector; private static final Settings DISABLE_ADMISSION_CONTROL = Settings.builder() - .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode()) .build(); private static final Settings ENFORCE_ADMISSION_CONTROL = Settings.builder() @@ -92,16 +95,21 @@ public void testAdmissionControlEnforced() throws Exception { fail("expected failure"); } catch (Exception e) { assertTrue(e instanceof OpenSearchRejectedExecutionException); + assertTrue(e.getMessage().contains("CPU usage admission controller rejected the request")); + assertTrue(e.getMessage().contains("[indices:admin/aliases/get]")); + assertTrue(e.getMessage().contains("action-type [CLUSTER_ADMIN]")); } client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet(); GetAliasesResponse getAliasesResponse = dataNodeClient().admin().indices().getAliases(aliasesRequest).actionGet(); assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1)); - AdmissionControlService admissionControlServicePrimary = internalCluster().getClusterManagerNodeInstance( - AdmissionControlService.class + AdmissionControlService admissionControlServiceCM = internalCluster().getClusterManagerNodeInstance(AdmissionControlService.class); + + AdmissionControllerStats admissionStats = getAdmissionControlStats(admissionControlServiceCM).get( + CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER ); - AdmissionControllerStats admissionStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); + assertEquals(admissionStats.rejectionCount.get(AdmissionControlActionType.CLUSTER_ADMIN.getType()).longValue(), 1); assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType())); assertNull(admissionStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType())); @@ -121,8 +129,18 @@ public void testAdmissionControlEnabledOnNoBreach() throws InterruptedException assertThat(getAliasesResponse.getAliases().get("test").size(), equalTo(1)); } + public void testAdmissionControlMonitorOnBreach() throws InterruptedException { + admissionControlDisabledOnBreach( + Settings.builder().put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()).build() + ); + } + public void testAdmissionControlDisabledOnBreach() throws InterruptedException { - client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet(); + admissionControlDisabledOnBreach(DISABLE_ADMISSION_CONTROL); + } + + public void admissionControlDisabledOnBreach(Settings admission) throws InterruptedException { + client().admin().cluster().prepareUpdateSettings().setTransientSettings(admission).execute().actionGet(); cMResourceCollector.collectNodeResourceUsageStats(clusterManagerNodeId, System.currentTimeMillis(), 97, 97, new IoUsageStats(98)); @@ -169,4 +187,12 @@ public void tearDown() throws Exception { client().admin().cluster().prepareUpdateSettings().setTransientSettings(DISABLE_ADMISSION_CONTROL).execute().actionGet(); super.tearDown(); } + + Map getAdmissionControlStats(AdmissionControlService admissionControlService) { + Map acStats = new HashMap<>(); + for (AdmissionControllerStats admissionControllerStats : admissionControlService.stats().getAdmissionControllerStatsList()) { + acStats.put(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats); + } + return acStats; + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 89f3769f09b6b..4f1815de224db 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -709,21 +709,17 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING, IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, + IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, // Admission Control Settings AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, - CpuBasedAdmissionControllerSettings.CLUSTER_INFO_CPU_USAGE_LIMIT, CpuBasedAdmissionControllerSettings.CLUSTER_ADMIN_CPU_USAGE_LIMIT, - IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT, IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT, - IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, - IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, - IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, // Concurrent segment search settings SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java index ad6cc3ff378f0..d03b2050cd5f3 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java @@ -68,7 +68,8 @@ private void applyForTransportLayer(String actionName, AdmissionControlActionTyp throw new OpenSearchRejectedExecutionException( String.format( Locale.ROOT, - "Io usage admission controller rejected the request for action [%s] as IO limit reached", + "IO usage admission controller rejected the request for action [%s] as IO limit reached for action-type [%s]", + actionName, admissionControlActionType.name() ) ); @@ -113,6 +114,8 @@ private long getIoRejectionThreshold(AdmissionControlActionType admissionControl return this.settings.getSearchIOUsageLimit(); case INDEXING: return this.settings.getIndexingIOUsageLimit(); + case CLUSTER_ADMIN: + return this.settings.getClusterAdminIOUsageLimit(); default: throw new IllegalArgumentException( String.format( diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java index 168372c4724c9..6acc440180281 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java @@ -34,12 +34,15 @@ public String getType() { public static AdmissionControlActionType fromName(String name) { name = name.toLowerCase(Locale.ROOT); - - for (AdmissionControlActionType type : AdmissionControlActionType.values()) { - if (type.getType().equals(name)) { - return type; - } + switch (name) { + case "indexing": + return INDEXING; + case "search": + return SEARCH; + case "cluster_admin": + return CLUSTER_ADMIN; + default: + throw new IllegalArgumentException("Not Supported TransportAction Type: " + name); } - throw new IllegalArgumentException("Not Supported TransportAction Type: " + name); } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java index e58ed28d21605..e442906ea77d7 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java @@ -25,11 +25,14 @@ public class IoBasedAdmissionControllerSettings { */ public static class Defaults { public static final long IO_USAGE_LIMIT = 95; + public static final long CLUSTER_ADMIN_IO_USAGE_LIMIT = 100; + } private AdmissionControlMode transportLayerMode; private Long searchIOUsageLimit; private Long indexingIOUsageLimit; + private Long clusterAdminIOUsageLimit; /** * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set @@ -63,11 +66,22 @@ public static class Defaults { Setting.Property.NodeScope ); + /** + * This setting used to set the limits for cluster admin requests by default it will use default cluster_admin IO usage limit + */ + public static final Setting CLUSTER_ADMIN_IO_USAGE_LIMIT = Setting.longSetting( + "admission_control.cluster_admin.io_usage.limit", + Defaults.CLUSTER_ADMIN_IO_USAGE_LIMIT, + Setting.Property.Final, + Setting.Property.NodeScope + ); + public IoBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) { this.transportLayerMode = IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); clusterSettings.addSettingsUpdateConsumer(IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); this.searchIOUsageLimit = SEARCH_IO_USAGE_LIMIT.get(settings); this.indexingIOUsageLimit = INDEXING_IO_USAGE_LIMIT.get(settings); + this.clusterAdminIOUsageLimit = CLUSTER_ADMIN_IO_USAGE_LIMIT.get(settings); clusterSettings.addSettingsUpdateConsumer(INDEXING_IO_USAGE_LIMIT, this::setIndexingIOUsageLimit); clusterSettings.addSettingsUpdateConsumer(SEARCH_IO_USAGE_LIMIT, this::setSearchIOUsageLimit); } @@ -95,4 +109,8 @@ public Long getIndexingIOUsageLimit() { public Long getSearchIOUsageLimit() { return searchIOUsageLimit; } + + public Long getClusterAdminIOUsageLimit() { + return clusterAdminIOUsageLimit; + } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java index ff777c175ec0e..c462f9700264d 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java @@ -72,6 +72,10 @@ public void testDefaultSettings() { assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), percent); assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), percent); + assertEquals( + ioBasedAdmissionControllerSettings.getClusterAdminIOUsageLimit().longValue(), + IoBasedAdmissionControllerSettings.Defaults.CLUSTER_ADMIN_IO_USAGE_LIMIT + ); } public void testGetConfiguredSettings() { @@ -134,6 +138,10 @@ public void testUpdateAfterGetConfiguredSettings() { assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent); assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), percent); + assertEquals( + ioBasedAdmissionControllerSettings.getClusterAdminIOUsageLimit().longValue(), + IoBasedAdmissionControllerSettings.Defaults.CLUSTER_ADMIN_IO_USAGE_LIMIT + ); Settings updatedSettings = Settings.builder() .put( @@ -146,6 +154,10 @@ public void testUpdateAfterGetConfiguredSettings() { assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.MONITOR); assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent); assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), indexingPercent); + assertEquals( + ioBasedAdmissionControllerSettings.getClusterAdminIOUsageLimit().longValue(), + IoBasedAdmissionControllerSettings.Defaults.CLUSTER_ADMIN_IO_USAGE_LIMIT + ); searchPercent = 70; updatedSettings = Settings.builder() @@ -156,5 +168,9 @@ public void testUpdateAfterGetConfiguredSettings() { clusterService.getClusterSettings().applySettings(updatedSettings); assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent); assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), indexingPercent); + assertEquals( + ioBasedAdmissionControllerSettings.getClusterAdminIOUsageLimit().longValue(), + IoBasedAdmissionControllerSettings.Defaults.CLUSTER_ADMIN_IO_USAGE_LIMIT + ); } }