From 128ee91dc77210f4979e66bd5e6dd8057f308bc3 Mon Sep 17 00:00:00 2001 From: Andrew Ross Date: Thu, 14 Mar 2024 10:56:48 -0700 Subject: [PATCH] Revert "Integrate IO Based AdmissionController to AdmissionControl Framework (#12583)" This reverts commit b6b16d8a85982edd7e4d43a9eb0681f94bd1d9cc. Reverting as this introduced test failures detailed in #12664. --- CHANGELOG.md | 1 - .../AdmissionControlMultiNodeIT.java | 292 ++++++++++++++++++ .../common/settings/ClusterSettings.java | 4 - .../tracker/NodeResourceUsageTracker.java | 4 - .../AdmissionControlService.java | 19 +- .../controllers/AdmissionController.java | 1 + .../IoBasedAdmissionController.java | 126 -------- .../IoBasedAdmissionControllerSettings.java | 98 ------ .../ResourceUsageCollectorServiceTests.java | 96 +++--- .../AdmissionControlServiceTests.java | 31 +- .../AdmissionControlSingleNodeTests.java | 287 ++--------------- .../IoBasedAdmissionControllerTests.java | 141 --------- ...PUBasedAdmissionControlSettingsTests.java} | 2 +- ...BasedAdmissionControllerSettingsTests.java | 160 ---------- 14 files changed, 388 insertions(+), 874 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java delete mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java delete mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java delete mode 100644 server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java rename server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/{CPUBasedAdmissionControllerSettingsTests.java => CPUBasedAdmissionControlSettingsTests.java} (98%) delete mode 100644 server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e2160941106d..33d1f6ee02027 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remote reindex: Add support for configurable retry mechanism ([#12561](https://github.com/opensearch-project/OpenSearch/pull/12561)) - [Admission Control] Integrate IO Usage Tracker to the Resource Usage Collector Service and Emit IO Usage Stats ([#11880](https://github.com/opensearch-project/OpenSearch/pull/11880)) - Tracing for deep search path ([#12103](https://github.com/opensearch-project/OpenSearch/pull/12103)) -- [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java new file mode 100644 index 0000000000000..0af3d31f9e846 --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlMultiNodeIT.java @@ -0,0 +1,292 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.indices.stats.IndicesStatsResponse; +import org.opensearch.action.admin.indices.stats.ShardStats; +import org.opensearch.action.bulk.BulkRequest; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.ShardRouting; +import org.opensearch.common.UUIDs; +import org.opensearch.common.collect.Tuple; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; +import org.opensearch.test.OpenSearchIntegTestCase; +import org.junit.After; +import org.junit.Before; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 2, numClientNodes = 1) +public class AdmissionControlMultiNodeIT extends OpenSearchIntegTestCase { + + public static final Settings settings = Settings.builder() + .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0) + .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0) + .build(); + + private static final Logger LOGGER = LogManager.getLogger(AdmissionControlMultiNodeIT.class); + + public static final String INDEX_NAME = "test_index"; + + @Before + public void init() { + assertAcked( + prepareCreate( + INDEX_NAME, + Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + ) + ); + ensureGreen(INDEX_NAME); + } + + @After + public void cleanup() { + client().admin().indices().prepareDelete(INDEX_NAME).get(); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(settings).build(); + } + + public void testAdmissionControlRejectionOnEnforced() { + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); + AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 3; ++i) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + bulkRequest.add(request); + } + BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus()); + AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats() + .getAdmissionControllerStatsList() + .get(0); + assertEquals(admissionControlPrimaryStats.rejectionCount.get(AdmissionControlActionType.INDEXING.getType()).longValue(), 1); + Arrays.stream(res.getItems()).forEach(bulkItemResponse -> { + assertTrue(bulkItemResponse.getFailureMessage().contains("OpenSearchRejectedExecutionException")); + }); + SearchResponse searchResponse; + try { + searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); + } catch (Exception exception) { + assertTrue(((SearchPhaseExecutionException) exception).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); + } + AdmissionControllerStats primaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); + assertEquals(primaryStats.rejectionCount.get(AdmissionControlActionType.SEARCH.getType()).longValue(), 1); + } + + public void testAdmissionControlEnforcedOnNonACEnabledActions() throws ExecutionException, InterruptedException { + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + + updateSettingsRequest.transientSettings( + Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + NodesStatsRequest nodesStatsRequest = new NodesStatsRequest(); + nodesStatsRequest.clear() + .indices(true) + .addMetrics( + NodesStatsRequest.Metric.JVM.metricName(), + NodesStatsRequest.Metric.OS.metricName(), + NodesStatsRequest.Metric.FS.metricName(), + NodesStatsRequest.Metric.PROCESS.metricName(), + NodesStatsRequest.Metric.ADMISSION_CONTROL.metricName() + ); + NodesStatsResponse nodesStatsResponse = client(coordinatingOnlyNode).admin().cluster().nodesStats(nodesStatsRequest).actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().health(new ClusterHealthRequest()).actionGet(); + assertEquals(200, clusterHealthResponse.status().getStatus()); + assertFalse(nodesStatsResponse.hasFailures()); + } + + public void testAdmissionControlRejectionOnMonitor() { + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + + AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); + AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + + updateSettingsRequest.transientSettings( + Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 3; ++i) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + bulkRequest.add(request); + } + BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats() + .getAdmissionControllerStatsList() + .get(0); + AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats() + .getAdmissionControllerStatsList() + .get(0); + long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + assertEquals(primaryRejectionCount, 1); + assertEquals(replicaRejectionCount, 0); + SearchResponse searchResponse; + searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); + admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); + admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0); + primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + replicaRejectionCount = admissionControlReplicaStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + assertTrue(primaryRejectionCount == 1 || replicaRejectionCount == 1); + assertFalse(primaryRejectionCount == 1 && replicaRejectionCount == 1); + } + + public void testAdmissionControlRejectionOnDisabled() { + Tuple primaryReplicaNodeNames = getPrimaryReplicaNodeNames(INDEX_NAME); + String primaryName = primaryReplicaNodeNames.v1(); + String replicaName = primaryReplicaNodeNames.v2(); + String coordinatingOnlyNode = getCoordinatingOnlyNode(); + + AdmissionControlService admissionControlServicePrimary = internalCluster().getInstance(AdmissionControlService.class, primaryName); + AdmissionControlService admissionControlServiceReplica = internalCluster().getInstance(AdmissionControlService.class, replicaName); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + + updateSettingsRequest.transientSettings( + Settings.builder() + .put( + CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.DISABLED.getMode() + ) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < 3; ++i) { + IndexRequest request = new IndexRequest(INDEX_NAME).id(UUIDs.base64UUID()) + .source(Collections.singletonMap("key", randomAlphaOfLength(50))); + bulkRequest.add(request); + } + BulkResponse res = client(coordinatingOnlyNode).bulk(bulkRequest).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControllerStats admissionControlPrimaryStats = admissionControlServicePrimary.stats() + .getAdmissionControllerStatsList() + .get(0); + AdmissionControllerStats admissionControlReplicaStats = admissionControlServiceReplica.stats() + .getAdmissionControllerStatsList() + .get(0); + long primaryRejectionCount = admissionControlPrimaryStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + long replicaRejectionCount = admissionControlReplicaStats.rejectionCount.getOrDefault( + AdmissionControlActionType.INDEXING.getType(), + new AtomicLong(0).longValue() + ); + assertEquals(primaryRejectionCount, 0); + assertEquals(replicaRejectionCount, 0); + SearchResponse searchResponse; + searchResponse = client(coordinatingOnlyNode).prepareSearch(INDEX_NAME).get(); + admissionControlPrimaryStats = admissionControlServicePrimary.stats().getAdmissionControllerStatsList().get(0); + admissionControlReplicaStats = admissionControlServiceReplica.stats().getAdmissionControllerStatsList().get(0); + primaryRejectionCount = admissionControlPrimaryStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + replicaRejectionCount = admissionControlReplicaStats.getRejectionCount() + .getOrDefault(AdmissionControlActionType.SEARCH.getType(), new AtomicLong(0).longValue()); + assertTrue(primaryRejectionCount == 0 && replicaRejectionCount == 0); + } + + private Tuple getPrimaryReplicaNodeNames(String indexName) { + IndicesStatsResponse response = client().admin().indices().prepareStats(indexName).get(); + String primaryId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(ShardRouting::primary) + .findAny() + .get() + .currentNodeId(); + String replicaId = Stream.of(response.getShards()) + .map(ShardStats::getShardRouting) + .filter(sr -> sr.primary() == false) + .findAny() + .get() + .currentNodeId(); + DiscoveryNodes nodes = client().admin().cluster().prepareState().get().getState().nodes(); + String primaryName = nodes.get(primaryId).getName(); + String replicaName = nodes.get(replicaId).getName(); + return new Tuple<>(primaryName, replicaName); + } + + private String getCoordinatingOnlyNode() { + return client().admin() + .cluster() + .prepareState() + .get() + .getState() + .nodes() + .getCoordinatingOnlyNodes() + .values() + .iterator() + .next() + .getName(); + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index de7b25b12739a..5090010198a5d 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -140,7 +140,6 @@ import org.opensearch.plugins.PluginsService; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings; import org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings; -import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings; import org.opensearch.repositories.fs.FsRepository; import org.opensearch.rest.BaseRestHandler; import org.opensearch.script.ScriptService; @@ -709,9 +708,6 @@ public void apply(Settings value, Settings current, Settings previous) { CpuBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT, CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT, - IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, - IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT, - IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT, IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, // Concurrent segment search settings diff --git a/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java b/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java index 621f90e80454c..546ae07cde221 100644 --- a/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java +++ b/server/src/main/java/org/opensearch/node/resource/tracker/NodeResourceUsageTracker.java @@ -8,7 +8,6 @@ package org.opensearch.node.resource.tracker; -import org.apache.lucene.util.Constants; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -70,9 +69,6 @@ public IoUsageStats getIoUsageStats() { * Checks if all of the resource usage trackers are ready */ public boolean isReady() { - if (Constants.LINUX) { - return memoryUsageTracker.isReady() && cpuUsageTracker.isReady() && ioUsageTracker.isReady(); - } return memoryUsageTracker.isReady() && cpuUsageTracker.isReady(); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java index 5b842ff0d3399..adca6992833bd 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java @@ -10,13 +10,11 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.util.Constants; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; -import org.opensearch.ratelimitting.admissioncontrol.controllers.IoBasedAdmissionController; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; @@ -28,7 +26,6 @@ import java.util.concurrent.ConcurrentMap; import static org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER; -import static org.opensearch.ratelimitting.admissioncontrol.controllers.IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER; /** * Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch. @@ -61,18 +58,15 @@ public AdmissionControlService( this.clusterService = clusterService; this.settings = settings; this.resourceUsageCollectorService = resourceUsageCollectorService; - this.initialize(); + this.initialise(); } /** * Initialise and Register all the admissionControllers */ - private void initialize() { + private void initialise() { // Initialise different type of admission controllers registerAdmissionController(CPU_BASED_ADMISSION_CONTROLLER); - if (Constants.LINUX) { - registerAdmissionController(IO_BASED_ADMISSION_CONTROLLER); - } } /** @@ -107,13 +101,6 @@ private AdmissionController controllerFactory(String admissionControllerName) { this.clusterService, this.settings ); - case IO_BASED_ADMISSION_CONTROLLER: - return new IoBasedAdmissionController( - admissionControllerName, - this.resourceUsageCollectorService, - this.clusterService, - this.settings - ); default: throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName); } @@ -141,7 +128,7 @@ public AdmissionController getAdmissionController(String controllerName) { */ public AdmissionControlStats stats() { List statsList = new ArrayList<>(); - if (!this.admissionControllers.isEmpty()) { + if (this.admissionControllers.size() > 0) { this.admissionControllers.forEach((controllerName, admissionController) -> { AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController); statsList.add(admissionControllerStats); diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java index f5bb5fa660e7f..2246ce34dd399 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java @@ -24,6 +24,7 @@ * and admission control can be applied if configured limit has been reached */ public abstract class AdmissionController { + private final String admissionControllerName; final ResourceUsageCollectorService resourceUsageCollectorService; public final Map rejectionCountMap; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java deleted file mode 100644 index ad6cc3ff378f0..0000000000000 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionController.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ratelimitting.admissioncontrol.controllers; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.Settings; -import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; -import org.opensearch.node.NodeResourceUsageStats; -import org.opensearch.node.ResourceUsageCollectorService; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; -import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings; - -import java.util.Locale; -import java.util.Optional; - -/** - * Class for IO Based Admission Controller in OpenSearch, which aims to provide IO utilisation admission control. - * It provides methods to apply admission control if configured limit has been reached - */ -public class IoBasedAdmissionController extends AdmissionController { - public static final String IO_BASED_ADMISSION_CONTROLLER = "global_io_usage"; - private static final Logger LOGGER = LogManager.getLogger(IoBasedAdmissionController.class); - public IoBasedAdmissionControllerSettings settings; - - /** - * @param admissionControllerName name of the admissionController - * @param resourceUsageCollectorService instance used to get resource usage stats of the node - * @param clusterService instance of the clusterService - */ - public IoBasedAdmissionController( - String admissionControllerName, - ResourceUsageCollectorService resourceUsageCollectorService, - ClusterService clusterService, - Settings settings - ) { - super(admissionControllerName, resourceUsageCollectorService, clusterService); - this.settings = new IoBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings); - } - - /** - * Apply admission control based on the resource usage for an action - * - * @param action is the transport action - * @param admissionControlActionType type of admissionControlActionType - */ - @Override - public void apply(String action, AdmissionControlActionType admissionControlActionType) { - if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) { - this.applyForTransportLayer(action, admissionControlActionType); - } - } - - /** - * Apply transport layer admission control if configured limit has been reached - */ - private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) { - if (isLimitsBreached(actionName, admissionControlActionType)) { - this.addRejectionCount(admissionControlActionType.getType(), 1); - if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) { - throw new OpenSearchRejectedExecutionException( - String.format( - Locale.ROOT, - "Io usage admission controller rejected the request for action [%s] as IO limit reached", - admissionControlActionType.name() - ) - ); - } - } - } - - /** - * Check if the configured resource usage limits are breached for the action - */ - private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) { - // check if cluster state is ready - if (clusterService.state() != null && clusterService.state().nodes() != null) { - long ioUsageThreshold = this.getIoRejectionThreshold(admissionControlActionType); - Optional nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics( - this.clusterService.state().nodes().getLocalNodeId() - ); - if (nodePerformanceStatistics.isPresent()) { - double ioUsage = nodePerformanceStatistics.get().getIoUsageStats().getIoUtilisationPercent(); - if (ioUsage >= ioUsageThreshold) { - LOGGER.warn( - "IoBasedAdmissionController limit reached as the current IO " - + "usage [{}] exceeds the allowed limit [{}] for transport action [{}] in admissionControlMode [{}]", - ioUsage, - ioUsageThreshold, - actionName, - this.settings.getTransportLayerAdmissionControllerMode() - ); - return true; - } - } - } - return false; - } - - /** - * Get IO rejection threshold based on action type - */ - private long getIoRejectionThreshold(AdmissionControlActionType admissionControlActionType) { - switch (admissionControlActionType) { - case SEARCH: - return this.settings.getSearchIOUsageLimit(); - case INDEXING: - return this.settings.getIndexingIOUsageLimit(); - default: - throw new IllegalArgumentException( - String.format( - Locale.ROOT, - "Admission control not Supported for AdmissionControlActionType: %s", - admissionControlActionType.getType() - ) - ); - } - } -} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java deleted file mode 100644 index e58ed28d21605..0000000000000 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettings.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ratelimitting.admissioncontrol.settings; - -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; -import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; - -/** - * Settings related to IO based admission controller. - * @opensearch.internal - */ -public class IoBasedAdmissionControllerSettings { - - /** - * Default parameters for the IoBasedAdmissionControllerSettings - */ - public static class Defaults { - public static final long IO_USAGE_LIMIT = 95; - } - - private AdmissionControlMode transportLayerMode; - private Long searchIOUsageLimit; - private Long indexingIOUsageLimit; - - /** - * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set - * rejection will be performed, otherwise only rejection metrics will be populated. - */ - public static final Setting IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE = new Setting<>( - "admission_control.transport.io_usage.mode_override", - AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, - AdmissionControlMode::fromName, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * This setting used to set the IO Limits for the search requests by default it will use default IO usage limit - */ - public static final Setting SEARCH_IO_USAGE_LIMIT = Setting.longSetting( - "admission_control.search.io_usage.limit", - Defaults.IO_USAGE_LIMIT, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * This setting used to set the IO limits for the indexing requests by default it will use default IO usage limit - */ - public static final Setting INDEXING_IO_USAGE_LIMIT = Setting.longSetting( - "admission_control.indexing.io_usage.limit", - Defaults.IO_USAGE_LIMIT, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - public IoBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) { - this.transportLayerMode = IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); - clusterSettings.addSettingsUpdateConsumer(IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); - this.searchIOUsageLimit = SEARCH_IO_USAGE_LIMIT.get(settings); - this.indexingIOUsageLimit = INDEXING_IO_USAGE_LIMIT.get(settings); - clusterSettings.addSettingsUpdateConsumer(INDEXING_IO_USAGE_LIMIT, this::setIndexingIOUsageLimit); - clusterSettings.addSettingsUpdateConsumer(SEARCH_IO_USAGE_LIMIT, this::setSearchIOUsageLimit); - } - - public void setIndexingIOUsageLimit(Long indexingIOUsageLimit) { - this.indexingIOUsageLimit = indexingIOUsageLimit; - } - - public void setSearchIOUsageLimit(Long searchIOUsageLimit) { - this.searchIOUsageLimit = searchIOUsageLimit; - } - - public AdmissionControlMode getTransportLayerAdmissionControllerMode() { - return transportLayerMode; - } - - public void setTransportLayerMode(AdmissionControlMode transportLayerMode) { - this.transportLayerMode = transportLayerMode; - } - - public Long getIndexingIOUsageLimit() { - return indexingIOUsageLimit; - } - - public Long getSearchIOUsageLimit() { - return searchIOUsageLimit; - } -} diff --git a/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java b/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java index 6dd90784ab65f..f2ee0e61c4953 100644 --- a/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java +++ b/server/src/test/java/org/opensearch/node/ResourceUsageCollectorServiceTests.java @@ -14,21 +14,24 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.node.resource.tracker.NodeResourceUsageTracker; import org.opensearch.node.resource.tracker.ResourceTrackerSettings; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; import org.junit.After; +import org.junit.Before; import java.util.Map; import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; -import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.ClusterServiceUtils.createClusterService; import static org.hamcrest.Matchers.greaterThan; /** @@ -36,50 +39,61 @@ * are working as expected */ public class ResourceUsageCollectorServiceTests extends OpenSearchSingleNodeTestCase { - @Override - protected boolean resetNodeAfterTest() { - return true; - } - @Override - protected Settings nodeSettings() { - return Settings.builder() - .put(super.nodeSettings()) - .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) - .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) - .put(ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(5000)) - .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + private ClusterService clusterService; + private ResourceUsageCollectorService collector; + private ThreadPool threadpool; + NodeResourceUsageTracker tracker; + + @Before + public void setUp() throws Exception { + super.setUp(); + + threadpool = new TestThreadPool("resource_usage_collector_tests"); + + clusterService = createClusterService(threadpool); + + Settings settings = Settings.builder() + .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), new TimeValue(500, TimeUnit.MILLISECONDS)) .build(); + tracker = new NodeResourceUsageTracker( + null, + threadpool, + settings, + new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + collector = new ResourceUsageCollectorService(tracker, clusterService, threadpool); + tracker.start(); + collector.start(); } @After - public void cleanup() { - assertAcked( - client().admin() - .cluster() - .prepareUpdateSettings() - .setPersistentSettings(Settings.builder().putNull("*")) - .setTransientSettings(Settings.builder().putNull("*")) - ); + public void tearDown() throws Exception { + super.tearDown(); + threadpool.shutdownNow(); + clusterService.close(); + collector.stop(); + tracker.stop(); + collector.close(); + tracker.close(); } public void testResourceUsageStats() { - ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class); - resourceUsageCollectorService.collectNodeResourceUsageStats("node1", System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); - Map nodeStats = resourceUsageCollectorService.getAllNodeStatistics(); + collector.collectNodeResourceUsageStats("node1", System.currentTimeMillis(), 97, 99, new IoUsageStats(98)); + Map nodeStats = collector.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); assertEquals(99.0, nodeStats.get("node1").cpuUtilizationPercent, 0.0); assertEquals(97.0, nodeStats.get("node1").memoryUtilizationPercent, 0.0); assertEquals(98, nodeStats.get("node1").getIoUsageStats().getIoUtilisationPercent(), 0.0); - Optional nodeResourceUsageStatsOptional = resourceUsageCollectorService.getNodeStatistics("node1"); + Optional nodeResourceUsageStatsOptional = collector.getNodeStatistics("node1"); assertNotNull(nodeResourceUsageStatsOptional.get()); assertEquals(99.0, nodeResourceUsageStatsOptional.get().cpuUtilizationPercent, 0.0); assertEquals(97.0, nodeResourceUsageStatsOptional.get().memoryUtilizationPercent, 0.0); assertEquals(98, nodeResourceUsageStatsOptional.get().getIoUsageStats().getIoUtilisationPercent(), 0.0); - nodeResourceUsageStatsOptional = resourceUsageCollectorService.getNodeStatistics("node2"); + nodeResourceUsageStatsOptional = collector.getNodeStatistics("node2"); assertTrue(nodeResourceUsageStatsOptional.isEmpty()); } @@ -87,29 +101,26 @@ public void testScheduler() throws Exception { /** * Wait for cluster state to be ready so that localNode().getId() is ready and we add the values to the map */ - ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class); - ClusterService clusterService = getInstanceFromNode(ClusterService.class); - assertBusy(() -> assertEquals(1, resourceUsageCollectorService.getAllNodeStatistics().size())); - + assertBusy(() -> assertTrue(collector.getNodeStatistics(clusterService.localNode().getId()).isPresent()), 1, TimeUnit.MINUTES); + assertTrue(collector.getNodeStatistics(clusterService.localNode().getId()).isPresent()); /** * Wait for memory utilization to be reported greater than 0 */ assertBusy( () -> assertThat( - resourceUsageCollectorService.getNodeStatistics(clusterService.localNode().getId()).get().getMemoryUtilizationPercent(), + collector.getNodeStatistics(clusterService.localNode().getId()).get().getMemoryUtilizationPercent(), greaterThan(0.0) ), 5, TimeUnit.SECONDS ); - assertTrue(resourceUsageCollectorService.getNodeStatistics("Invalid").isEmpty()); + assertTrue(collector.getNodeStatistics("Invalid").isEmpty()); } /* * Test that concurrently adding values and removing nodes does not cause exceptions */ public void testConcurrentAddingAndRemovingNodes() throws Exception { - ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class); String[] nodes = new String[] { "a", "b", "c", "d" }; final CountDownLatch latch = new CountDownLatch(5); @@ -123,9 +134,9 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception { } for (int i = 0; i < randomIntBetween(100, 200); i++) { if (randomBoolean()) { - resourceUsageCollectorService.removeNodeResourceUsageStats(randomFrom(nodes)); + collector.removeNodeResourceUsageStats(randomFrom(nodes)); } - resourceUsageCollectorService.collectNodeResourceUsageStats( + collector.collectNodeResourceUsageStats( randomFrom(nodes), System.currentTimeMillis(), randomIntBetween(1, 100), @@ -150,7 +161,7 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception { t3.join(); t4.join(); - final Map nodeStats = resourceUsageCollectorService.getAllNodeStatistics(); + final Map nodeStats = collector.getAllNodeStatistics(); for (String nodeId : nodes) { if (nodeStats.containsKey(nodeId)) { assertThat(nodeStats.get(nodeId).memoryUtilizationPercent, greaterThan(0.0)); @@ -161,15 +172,14 @@ public void testConcurrentAddingAndRemovingNodes() throws Exception { } public void testNodeRemoval() { - ResourceUsageCollectorService resourceUsageCollectorService = getInstanceFromNode(ResourceUsageCollectorService.class); - resourceUsageCollectorService.collectNodeResourceUsageStats( + collector.collectNodeResourceUsageStats( "node1", System.currentTimeMillis(), randomIntBetween(1, 100), randomIntBetween(1, 100), new IoUsageStats(randomIntBetween(1, 100)) ); - resourceUsageCollectorService.collectNodeResourceUsageStats( + collector.collectNodeResourceUsageStats( "node2", System.currentTimeMillis(), randomIntBetween(1, 100), @@ -189,8 +199,8 @@ public void testNodeRemoval() { .build(); ClusterChangedEvent event = new ClusterChangedEvent("test", newState, previousState); - resourceUsageCollectorService.clusterChanged(event); - final Map nodeStats = resourceUsageCollectorService.getAllNodeStatistics(); + collector.clusterChanged(event); + final Map nodeStats = collector.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); assertFalse(nodeStats.containsKey("node2")); } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java index 4f615290f1805..7a67ffc8c7c5d 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java @@ -8,7 +8,6 @@ package org.opensearch.ratelimitting.admissioncontrol; -import org.apache.lucene.util.Constants; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -49,21 +48,13 @@ public void tearDown() throws Exception { public void testWhenAdmissionControllerRegistered() { admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); - if (Constants.LINUX) { - assertEquals(admissionControlService.getAdmissionControllers().size(), 2); - } else { - assertEquals(admissionControlService.getAdmissionControllers().size(), 1); - } + assertEquals(admissionControlService.getAdmissionControllers().size(), 1); } public void testRegisterInvalidAdmissionController() { String test = "TEST"; admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); - if (Constants.LINUX) { - assertEquals(admissionControlService.getAdmissionControllers().size(), 2); - } else { - assertEquals(admissionControlService.getAdmissionControllers().size(), 1); - } + assertEquals(admissionControlService.getAdmissionControllers().size(), 1); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, () -> admissionControlService.registerAdmissionController(test) @@ -75,11 +66,7 @@ public void testAdmissionControllerSettings() { admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings; List admissionControllerList = admissionControlService.getAdmissionControllers(); - if (Constants.LINUX) { - assertEquals(admissionControllerList.size(), 2); - } else { - assertEquals(admissionControllerList.size(), 1); - } + assertEquals(admissionControllerList.size(), 1); CpuBasedAdmissionController cpuBasedAdmissionController = (CpuBasedAdmissionController) admissionControlService .getAdmissionController(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); assertEquals( @@ -145,11 +132,7 @@ public void testApplyAdmissionControllerEnabled() { .build(); clusterService.getClusterSettings().applySettings(settings); List admissionControllerList = admissionControlService.getAdmissionControllers(); - if (Constants.LINUX) { - assertEquals(admissionControllerList.size(), 2); - } else { - assertEquals(admissionControllerList.size(), 1); - } + assertEquals(admissionControllerList.size(), 1); } public void testApplyAdmissionControllerEnforced() { @@ -170,10 +153,6 @@ public void testApplyAdmissionControllerEnforced() { .build(); clusterService.getClusterSettings().applySettings(settings); List admissionControllerList = admissionControlService.getAdmissionControllers(); - if (Constants.LINUX) { - assertEquals(admissionControllerList.size(), 2); - } else { - assertEquals(admissionControllerList.size(), 1); - } + assertEquals(admissionControllerList.size(), 1); } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java index 5ea062c19489e..a1694b2c3cee2 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java @@ -8,7 +8,6 @@ package org.opensearch.ratelimitting.admissioncontrol; -import org.apache.lucene.util.Constants; import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.action.admin.cluster.state.ClusterStateResponse; @@ -22,22 +21,15 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.node.resource.tracker.ResourceTrackerSettings; -import org.opensearch.ratelimitting.admissioncontrol.controllers.CpuBasedAdmissionController; -import org.opensearch.ratelimitting.admissioncontrol.controllers.IoBasedAdmissionController; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; import org.opensearch.test.OpenSearchSingleNodeTestCase; import org.junit.After; -import java.util.HashMap; -import java.util.Map; - import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT; import static org.opensearch.ratelimitting.admissioncontrol.settings.CpuBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT; -import static org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT; -import static org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.is; @@ -46,8 +38,6 @@ */ public class AdmissionControlSingleNodeTests extends OpenSearchSingleNodeTestCase { - public static final String INDEX_NAME = "test_index"; - @Override protected boolean resetNodeAfterTest() { return true; @@ -55,7 +45,6 @@ protected boolean resetNodeAfterTest() { @After public void cleanup() { - client().admin().indices().prepareDelete(INDEX_NAME).get(); assertAcked( client().admin() .cluster() @@ -71,7 +60,6 @@ protected Settings nodeSettings() { .put(super.nodeSettings()) .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) - .put(ResourceTrackerSettings.GLOBAL_IO_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(5000)) .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0) .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0) @@ -81,10 +69,11 @@ protected Settings nodeSettings() { public void testAdmissionControlRejectionEnforcedMode() throws Exception { ensureGreen(); assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); - client().admin().indices().prepareCreate(INDEX_NAME).execute().actionGet(); + // Thread.sleep(700); + client().admin().indices().prepareCreate("index").execute().actionGet(); BulkRequestBuilder bulk = client().prepareBulk(); for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); } // Verify that cluster state is updated ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); @@ -94,116 +83,24 @@ public void testAdmissionControlRejectionEnforcedMode() throws Exception { BulkResponse res = client().bulk(bulk.request()).actionGet(); assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus()); AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); - Map acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals( - 1, - (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .get(AdmissionControlActionType.INDEXING.getType()) - ); - if (Constants.LINUX) { - assertEquals( - 0, - (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .getOrDefault(AdmissionControlActionType.INDEXING.getType(), 0L) - ); - } else { - assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } - client().admin().indices().prepareRefresh(INDEX_NAME).get(); - - // verify search request hits 429 - SearchRequest searchRequest = new SearchRequest(INDEX_NAME); - try { - client().search(searchRequest).actionGet(); - } catch (Exception e) { - assertTrue(((SearchPhaseExecutionException) e).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); - } - acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals( - 1, - (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .get(AdmissionControlActionType.SEARCH.getType()) - ); - if (Constants.LINUX) { - assertEquals( - 0, - (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L) - ); - } else { - assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } - ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.transientSettings( - Settings.builder() - .put(super.nodeSettings()) - .put(SEARCH_IO_USAGE_LIMIT.getKey(), 0) - .put(INDEXING_IO_USAGE_LIMIT.getKey(), 0) - .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101) - .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101) - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - - bulk = client().prepareBulk(); - for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); - } - res = client().bulk(bulk.request()).actionGet(); - if (Constants.LINUX) { - assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus()); - } - admissionControlService = getInstanceFromNode(AdmissionControlService.class); - acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals( - 1, - (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .get(AdmissionControlActionType.INDEXING.getType()) - ); - if (Constants.LINUX) { - assertEquals( - 1, - (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .getOrDefault(AdmissionControlActionType.INDEXING.getType(), 0L) - ); - } else { - assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } - client().admin().indices().prepareRefresh(INDEX_NAME).get(); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + client().admin().indices().prepareRefresh("index").get(); // verify search request hits 429 - searchRequest = new SearchRequest(INDEX_NAME); + SearchRequest searchRequest = new SearchRequest("index"); try { client().search(searchRequest).actionGet(); } catch (Exception e) { assertTrue(((SearchPhaseExecutionException) e).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); } - acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals( - 1, - (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .get(AdmissionControlActionType.SEARCH.getType()) - ); - if (Constants.LINUX) { - assertEquals( - 1, - (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L) - ); - } else { - assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); } public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); assertThat(future2.isDone(), is(true)); @@ -217,91 +114,27 @@ public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { BulkRequestBuilder bulk = client().prepareBulk(); for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); } // verify bulk request success but admission control having rejections stats BulkResponse res = client().bulk(bulk.request()).actionGet(); assertFalse(res.hasFailures()); AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); - Map acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals( - 1, - (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .get(AdmissionControlActionType.INDEXING.getType()) - ); - client().admin().indices().prepareRefresh(INDEX_NAME).get(); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + client().admin().indices().prepareRefresh("index").get(); // verify search request success but admission control having rejections stats - SearchRequest searchRequest = new SearchRequest(INDEX_NAME); + SearchRequest searchRequest = new SearchRequest("index"); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(3, searchResponse.getHits().getHits().length); - acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals( - 1, - (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .get(AdmissionControlActionType.SEARCH.getType()) - ); - - updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.transientSettings( - Settings.builder() - .put(super.nodeSettings()) - .put(SEARCH_IO_USAGE_LIMIT.getKey(), 0) - .put(INDEXING_IO_USAGE_LIMIT.getKey(), 0) - .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101) - .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101) - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - bulk = client().prepareBulk(); - for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); - } - // verify bulk request success but admission control having rejections stats - res = client().bulk(bulk.request()).actionGet(); - assertFalse(res.hasFailures()); - acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals( - 1, - (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .get(AdmissionControlActionType.INDEXING.getType()) - ); - if (Constants.LINUX) { - assertEquals( - 1, - (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .getOrDefault(AdmissionControlActionType.INDEXING.getType(), 0L) - ); - } else { - assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } - searchRequest = new SearchRequest(INDEX_NAME); - searchResponse = client().search(searchRequest).actionGet(); - assertEquals(3, searchResponse.getHits().getHits().length); - acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals( - 1, - (long) acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .get(AdmissionControlActionType.SEARCH.getType()) - ); - if (Constants.LINUX) { - assertEquals( - 1, - (long) acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER) - .getRejectionCount() - .getOrDefault(AdmissionControlActionType.SEARCH.getType(), 0L) - ); - } else { - assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); } public void testAdmissionControlRejectionDisabledMode() throws Exception { assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); assertThat(future2.isDone(), is(true)); @@ -313,62 +146,28 @@ public void testAdmissionControlRejectionDisabledMode() throws Exception { BulkRequestBuilder bulk = client().prepareBulk(); for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); } // verify bulk request success and no rejections BulkResponse res = client().bulk(bulk.request()).actionGet(); assertFalse(res.hasFailures()); AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); - Map acStats = this.getAdmissionControlStats(admissionControlService); - - assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size()); - client().admin().indices().prepareRefresh(INDEX_NAME).get(); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + client().admin().indices().prepareRefresh("index").get(); // verify search request success and no rejections - SearchRequest searchRequest = new SearchRequest(INDEX_NAME); + SearchRequest searchRequest = new SearchRequest("index"); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(3, searchResponse.getHits().getHits().length); - acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size()); - updateSettingsRequest = new ClusterUpdateSettingsRequest(); - updateSettingsRequest.transientSettings( - Settings.builder() - .put(super.nodeSettings()) - .put(SEARCH_IO_USAGE_LIMIT.getKey(), 0) - .put(INDEXING_IO_USAGE_LIMIT.getKey(), 0) - .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101) - .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101) - ); - assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); - bulk = client().prepareBulk(); - for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); - } - // verify bulk request success but admission control having rejections stats - res = client().bulk(bulk.request()).actionGet(); - assertFalse(res.hasFailures()); - acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size()); - if (Constants.LINUX) { - assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size()); - } else { - assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); - searchRequest = new SearchRequest(INDEX_NAME); - searchResponse = client().search(searchRequest).actionGet(); - assertEquals(3, searchResponse.getHits().getHits().length); - acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size()); - if (Constants.LINUX) { - assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size()); - } else { - assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } } public void testAdmissionControlWithinLimits() throws Exception { assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); assertThat(future2.isDone(), is(true)); @@ -379,46 +178,26 @@ public void testAdmissionControlWithinLimits() throws Exception { .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101) .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101) - .put(SEARCH_IO_USAGE_LIMIT.getKey(), 101) - .put(INDEXING_IO_USAGE_LIMIT.getKey(), 101) ); assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); BulkRequestBuilder bulk = client().prepareBulk(); for (int i = 0; i < 3; i++) { - bulk.add(client().prepareIndex(INDEX_NAME).setSource("foo", "bar " + i)); + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); } // verify bulk request success and no rejections BulkResponse res = client().bulk(bulk.request()).actionGet(); assertFalse(res.hasFailures()); AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); - Map acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size()); - if (Constants.LINUX) { - assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size()); - } else { - assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } - client().admin().indices().prepareRefresh(INDEX_NAME).get(); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + client().admin().indices().prepareRefresh("index").get(); // verify search request success and no rejections - SearchRequest searchRequest = new SearchRequest(INDEX_NAME); + SearchRequest searchRequest = new SearchRequest("index"); SearchResponse searchResponse = client().search(searchRequest).actionGet(); assertEquals(3, searchResponse.getHits().getHits().length); - acStats = this.getAdmissionControlStats(admissionControlService); - assertEquals(0, acStats.get(CpuBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER).getRejectionCount().size()); - if (Constants.LINUX) { - assertEquals(0, acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount().size()); - } else { - assertNull(acStats.get(IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER)); - } - } - - Map getAdmissionControlStats(AdmissionControlService admissionControlService) { - Map acStats = new HashMap<>(); - for (AdmissionControllerStats admissionControllerStats : admissionControlService.stats().getAdmissionControllerStatsList()) { - acStats.put(admissionControllerStats.getAdmissionControllerName(), admissionControllerStats); - } - return acStats; + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java deleted file mode 100644 index c5a2208f49ce6..0000000000000 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/IoBasedAdmissionControllerTests.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ratelimitting.admissioncontrol.controllers; - -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Settings; -import org.opensearch.node.ResourceUsageCollectorService; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; -import org.opensearch.ratelimitting.admissioncontrol.settings.IoBasedAdmissionControllerSettings; -import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; - -import org.mockito.Mockito; - -public class IoBasedAdmissionControllerTests extends OpenSearchTestCase { - private ClusterService clusterService; - private ThreadPool threadPool; - IoBasedAdmissionController admissionController = null; - String action = "TEST_ACTION"; - - @Override - public void setUp() throws Exception { - super.setUp(); - threadPool = new TestThreadPool("admission_controller_settings_test"); - clusterService = new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool - ); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - threadPool.shutdownNow(); - } - - public void testCheckDefaultParameters() { - admissionController = new IoBasedAdmissionController( - IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER, - null, - clusterService, - Settings.EMPTY - ); - assertEquals(admissionController.getName(), IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER); - assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); - assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); - assertFalse( - admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode()) - ); - } - - public void testCheckUpdateSettings() { - admissionController = new IoBasedAdmissionController( - IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER, - null, - clusterService, - Settings.EMPTY - ); - Settings settings = Settings.builder() - .put( - IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.ENFORCED.getMode() - ) - .build(); - clusterService.getClusterSettings().applySettings(settings); - assertEquals(admissionController.getName(), IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER); - assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); - assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); - assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); - } - - public void testApplyControllerWithDefaultSettings() { - ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); - admissionController = new IoBasedAdmissionController( - IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER, - rs, - clusterService, - Settings.EMPTY - ); - assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); - assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); - action = "indices:data/write/bulk[s][p]"; - admissionController.apply(action, AdmissionControlActionType.INDEXING); - assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); - } - - public void testApplyControllerWhenSettingsEnabled() throws Exception { - Settings settings = Settings.builder() - .put( - IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.ENFORCED.getMode() - ) - .build(); - ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); - admissionController = new IoBasedAdmissionController( - IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER, - rs, - clusterService, - settings - ); - assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); - assertTrue( - admissionController.isAdmissionControllerEnforced(admissionController.settings.getTransportLayerAdmissionControllerMode()) - ); - assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); - } - - public void testRejectionCount() { - Settings settings = Settings.builder() - .put( - IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.ENFORCED.getMode() - ) - .build(); - ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); - admissionController = new IoBasedAdmissionController( - IoBasedAdmissionController.IO_BASED_ADMISSION_CONTROLLER, - rs, - clusterService, - settings - ); - admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1); - admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 3); - assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.SEARCH.getType()), 1); - assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 3); - admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1); - admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 2); - assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.SEARCH.getType()), 2); - assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 5); - } -} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java similarity index 98% rename from server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java rename to server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java index 9ce28bc7fdb40..11688e2f30d4b 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettingsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java @@ -20,7 +20,7 @@ import java.util.Arrays; import java.util.Set; -public class CPUBasedAdmissionControllerSettingsTests extends OpenSearchTestCase { +public class CPUBasedAdmissionControlSettingsTests extends OpenSearchTestCase { private ClusterService clusterService; private ThreadPool threadPool; diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java deleted file mode 100644 index ff777c175ec0e..0000000000000 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/IoBasedAdmissionControllerSettingsTests.java +++ /dev/null @@ -1,160 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ratelimitting.admissioncontrol.settings; - -import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; -import org.opensearch.common.settings.Setting; -import org.opensearch.common.settings.Settings; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; -import org.opensearch.test.OpenSearchTestCase; -import org.opensearch.threadpool.TestThreadPool; -import org.opensearch.threadpool.ThreadPool; - -import java.util.Arrays; -import java.util.Set; - -public class IoBasedAdmissionControllerSettingsTests extends OpenSearchTestCase { - private ClusterService clusterService; - private ThreadPool threadPool; - - @Override - public void setUp() throws Exception { - super.setUp(); - threadPool = new TestThreadPool("io_based_admission_controller_settings_test"); - clusterService = new ClusterService( - Settings.EMPTY, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool - ); - } - - @Override - public void tearDown() throws Exception { - super.tearDown(); - threadPool.shutdownNow(); - } - - public void testSettingsExists() { - Set> settings = ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; - assertTrue( - "All the IO based admission controller settings should be supported built in settings", - settings.containsAll( - Arrays.asList( - IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, - IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT, - IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT - ) - ) - ); - } - - public void testDefaultSettings() { - IoBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IoBasedAdmissionControllerSettings( - clusterService.getClusterSettings(), - Settings.EMPTY - ); - long percent = 95; - assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); - assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), percent); - assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), percent); - } - - public void testGetConfiguredSettings() { - long percent = 95; - long indexingPercent = 85; - Settings settings = Settings.builder() - .put( - IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.ENFORCED.getMode() - ) - .put(IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT.getKey(), indexingPercent) - .build(); - - IoBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IoBasedAdmissionControllerSettings( - clusterService.getClusterSettings(), - settings - ); - assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); - assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), percent); - assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), indexingPercent); - } - - public void testUpdateAfterGetDefaultSettings() { - long percent = 95; - long searchPercent = 80; - IoBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IoBasedAdmissionControllerSettings( - clusterService.getClusterSettings(), - Settings.EMPTY - ); - Settings settings = Settings.builder() - .put( - IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.ENFORCED.getMode() - ) - .put(IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT.getKey(), searchPercent) - .build(); - - clusterService.getClusterSettings().applySettings(settings); - assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); - assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent); - assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), percent); - } - - public void testUpdateAfterGetConfiguredSettings() { - long percent = 95; - long indexingPercent = 85; - long searchPercent = 80; - Settings settings = Settings.builder() - .put( - IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.ENFORCED.getMode() - ) - .put(IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT.getKey(), searchPercent) - .build(); - - IoBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IoBasedAdmissionControllerSettings( - clusterService.getClusterSettings(), - settings - ); - assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); - assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent); - assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), percent); - - Settings updatedSettings = Settings.builder() - .put( - IoBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), - AdmissionControlMode.MONITOR.getMode() - ) - .put(IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT.getKey(), indexingPercent) - .build(); - clusterService.getClusterSettings().applySettings(updatedSettings); - assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.MONITOR); - assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent); - assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), indexingPercent); - - searchPercent = 70; - updatedSettings = Settings.builder() - .put(updatedSettings) - .put(IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT.getKey(), searchPercent) - .build(); - - clusterService.getClusterSettings().applySettings(updatedSettings); - assertEquals(ioBasedAdmissionControllerSettings.getSearchIOUsageLimit().longValue(), searchPercent); - assertEquals(ioBasedAdmissionControllerSettings.getIndexingIOUsageLimit().longValue(), indexingPercent); - } -}