From 25c16b65453c769f548556945b123632a6d32861 Mon Sep 17 00:00:00 2001 From: Ajay Kumar Movva Date: Wed, 23 Aug 2023 01:55:40 +0530 Subject: [PATCH] Admission Controller Module Transport Interceptor Initial Commit Signed-off-by: Ajay Kumar Movva --- CHANGELOG.md | 1 + modules/throttling/build.gradle | 34 ++++ .../OpenSearchThrottlingModulePlugin.java | 53 ++++++ .../opensearch/throttling/package-info.java | 12 ++ ...OpenSearchThrottlingModulePluginTests.java | 67 ++++++++ .../common/settings/ClusterSettings.java | 6 + .../main/java/org/opensearch/node/Node.java | 8 + .../AdmissionControlService.java | 153 +++++++++++++++++ .../AdmissionControlSettings.java | 125 ++++++++++++++ .../controllers/AdmissionController.java | 55 ++++++ .../IOBasedAdmissionController.java | 97 +++++++++++ .../controllers/package-info.java | 12 ++ .../enums/AdmissionControlMode.java | 66 ++++++++ .../enums/TransportRequestActionType.java | 50 ++++++ .../enums/package-info.java | 12 ++ .../admissioncontroller/package-info.java | 12 ++ .../IOBasedAdmissionControllerSettings.java | 94 +++++++++++ .../settings/package-info.java | 11 ++ .../AdmissionControlTransportHandler.java | 78 +++++++++ .../AdmissionControlTransportInterceptor.java | 40 +++++ .../transport/package-info.java | 11 ++ .../opensearch/throttling/package-info.java | 12 ++ .../AdmissionControlServiceTests.java | 153 +++++++++++++++++ .../AdmissionControlSettingsTests.java | 133 +++++++++++++++ .../IOBasedAdmissionControllerTests.java | 158 ++++++++++++++++++ .../IOBasedAdmissionControlSettingsTests.java | 152 +++++++++++++++++ ...AdmissionControlTransportHandlerTests.java | 84 ++++++++++ settings.gradle | 3 + 28 files changed, 1692 insertions(+) create mode 100644 modules/throttling/build.gradle create mode 100644 modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java create mode 100644 modules/throttling/src/main/java/org/opensearch/throttling/package-info.java create mode 100644 modules/throttling/src/test/java/org/opensearch/throttling/OpenSearchThrottlingModulePluginTests.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControlService.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControlSettings.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/AdmissionControlMode.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/TransportRequestActionType.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/package-info.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettings.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/package-info.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportHandler.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportInterceptor.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java create mode 100644 server/src/main/java/org/opensearch/throttling/package-info.java create mode 100644 server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControlServiceTests.java create mode 100644 server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControlSettingsTests.java create mode 100644 server/src/test/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionControllerTests.java create mode 100644 server/src/test/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControlSettingsTests.java create mode 100644 server/src/test/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportHandlerTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 25669b67fcb63..7d09ba1a18f7b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107)) - Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679)) - Added encryption-sdk lib to provide encryption and decryption capabilities ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466)) +- Admission Controller Module Transport Interceptor Initial Commit ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/modules/throttling/build.gradle b/modules/throttling/build.gradle new file mode 100644 index 0000000000000..88e2e9390a741 --- /dev/null +++ b/modules/throttling/build.gradle @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +opensearchplugin { + description 'Plugin intercepting requests and throttle based on resource consumption' + classname 'org.opensearch.throttling.OpenSearchThrottlingModulePlugin' +} diff --git a/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java b/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java new file mode 100644 index 0000000000000..336da7b7956b3 --- /dev/null +++ b/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling; + +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.throttling.admissioncontroller.AdmissionControlService; +import org.opensearch.throttling.admissioncontroller.transport.AdmissionControlTransportInterceptor; +import org.opensearch.transport.TransportInterceptor; + +import java.util.ArrayList; +import java.util.List; + +/** + * This plugin is used to register handlers to intercept both rest and transport requests. + */ +public class OpenSearchThrottlingModulePlugin extends Plugin implements NetworkPlugin { + + AdmissionControlService admissionControlService; + + /** + * Default Constructor for plugin + */ + public OpenSearchThrottlingModulePlugin() {} + + /** + * Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing + * transport (inter-node) requests. This must not return null + * + * @param namedWriteableRegistry registry of all named writeables registered + * @param threadContext a {@link ThreadContext} of the current nodes or clients that can be used to set additional + * headers in the interceptors + * @return list of transport interceptors + */ + @Override + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { + this.admissionControlService = AdmissionControlService.getInstance(); + List interceptors = new ArrayList<>(0); + // TODO Will throw exception in next PR's. This needs to ensure the service is up before adding into transport interceptor. + if (this.admissionControlService != null) { + interceptors.add(new AdmissionControlTransportInterceptor(this.admissionControlService)); + } + return interceptors; + } +} diff --git a/modules/throttling/src/main/java/org/opensearch/throttling/package-info.java b/modules/throttling/src/main/java/org/opensearch/throttling/package-info.java new file mode 100644 index 0000000000000..9b1d82626be5f --- /dev/null +++ b/modules/throttling/src/main/java/org/opensearch/throttling/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package contains classes related to throttling plugins + */ +package org.opensearch.throttling; diff --git a/modules/throttling/src/test/java/org/opensearch/throttling/OpenSearchThrottlingModulePluginTests.java b/modules/throttling/src/test/java/org/opensearch/throttling/OpenSearchThrottlingModulePluginTests.java new file mode 100644 index 0000000000000..29b1584e23e73 --- /dev/null +++ b/modules/throttling/src/test/java/org/opensearch/throttling/OpenSearchThrottlingModulePluginTests.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControlService; +import org.opensearch.throttling.admissioncontroller.transport.AdmissionControlTransportInterceptor; +import org.opensearch.transport.TransportInterceptor; + +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class OpenSearchThrottlingModulePluginTests extends OpenSearchTestCase { + OpenSearchThrottlingModulePlugin openSearchThrottlingModulePlugin; + private ClusterService clusterService; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testGetTransportInterceptors() { + openSearchThrottlingModulePlugin = new OpenSearchThrottlingModulePlugin(); + List interceptors = openSearchThrottlingModulePlugin.getTransportInterceptors( + mock(NamedWriteableRegistry.class), + threadPool.getThreadContext() + ); + assertEquals(interceptors.size(), 0); + AdmissionControlService admissionControlService = AdmissionControlService.newAdmissionControllerService( + Settings.EMPTY, + clusterService.getClusterSettings(), + threadPool + ); + interceptors = openSearchThrottlingModulePlugin.getTransportInterceptors( + mock(NamedWriteableRegistry.class), + threadPool.getThreadContext() + ); + assertEquals(interceptors.size(), 1); + assertEquals(interceptors.get(0).getClass(), AdmissionControlTransportInterceptor.class); + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 05938914b019f..091d3ffc0fe32 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -151,6 +151,8 @@ import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControlSettings; +import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings; import org.opensearch.transport.ProxyConnectionStrategy; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.RemoteConnectionStrategy; @@ -237,6 +239,10 @@ public void apply(Settings value, Settings current, Settings previous) { public static Set> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet( new HashSet<>( Arrays.asList( + AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + AdmissionControlSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING, + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e7c6eaebbf8ed..79ef9bed4d400 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -224,6 +224,7 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControlService; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportInterceptor; @@ -763,6 +764,12 @@ protected Node( fileCacheCleaner ); + final AdmissionControlService admissionControlService = AdmissionControlService.newAdmissionControllerService( + settings, + clusterService.getClusterSettings(), + threadPool + ); + final AliasValidator aliasValidator = new AliasValidator(); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices); @@ -1110,6 +1117,7 @@ protected Node( b.bind(IndexingPressureService.class).toInstance(indexingPressureService); b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService); b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService); + b.bind(AdmissionControlService.class).toInstance(admissionControlService); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControlService.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControlService.java new file mode 100644 index 0000000000000..466edd0d36c29 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControlService.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.controllers.AdmissionController; +import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.opensearch.throttling.admissioncontroller.AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER; + +/** + * Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch. + */ +public class AdmissionControlService { + private final ThreadPool threadPool; + public final AdmissionControlSettings admissionControlSettings; + private final ConcurrentMap ADMISSION_CONTROLLERS; + private static AdmissionControlService admissionControlService = null; + private static final Logger logger = LogManager.getLogger(AdmissionControlService.class); + private final ClusterSettings clusterSettings; + private final Settings settings; + + /** + * + * @param settings Immutable settings instance + * @param clusterSettings ClusterSettings Instance + * @param threadPool ThreadPool Instance + */ + public AdmissionControlService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + this.threadPool = threadPool; + this.admissionControlSettings = new AdmissionControlSettings(clusterSettings, settings, this); + this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>(); + this.clusterSettings = clusterSettings; + this.settings = settings; + this.initialise(); + } + + /** + * Initialise and Register all the admissionControllers + */ + public void initialise() { + // Initialise different type of admission controllers + registerAdmissionController(IO_BASED_ADMISSION_CONTROLLER); + } + + /** + * Handler to trigger registered admissionController + */ + public boolean applyTransportLayerAdmissionController(String action) { + this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.acquire(admissionControlSettings); }); + return true; + } + + /** + * + * @return singleton admissionControllerService Instance + */ + public static AdmissionControlService getInstance() { + return admissionControlService; + } + + /** + * + * @param settings Immutable settings instance + * @param clusterSettings ClusterSettings Instance + * @param threadPool ThreadPool Instance + * @return singleton admissionControllerService Instance + */ + public static synchronized AdmissionControlService newAdmissionControllerService( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool + ) { + if (admissionControlService == null) { + admissionControlService = new AdmissionControlService(settings, clusterSettings, threadPool); + } + return admissionControlService; + } + + /** + * + * @return true if the admissionController Feature is enabled + */ + public Boolean isTransportLayerAdmissionControllerEnabled() { + return this.admissionControlSettings.isTransportLayerAdmissionControllerEnabled(); + } + + /** + * + * @return true if the admissionController Feature is enabled + */ + public Boolean isTransportLayerAdmissionControllerEnforced() { + return this.admissionControlSettings.isTransportLayerAdmissionControllerEnforced(); + } + + /** + * + * @param admissionControllerName admissionControllerName to register into the service. + */ + public void registerAdmissionController(String admissionControllerName) { + AdmissionController admissionController = this.controllerFactory(admissionControllerName); + if (admissionController != null) { + this.ADMISSION_CONTROLLERS.put(admissionControllerName, admissionController); + } + } + + /** + * @return AdmissionController Instance + */ + private AdmissionController controllerFactory(String admissionControllerName) { + switch (admissionControllerName) { + case IO_BASED_ADMISSION_CONTROLLER: + return new IOBasedAdmissionController(admissionControllerName, this.settings, this.clusterSettings); + default: + return null; + } + } + + /** + * + * @return list of the registered admissionControllers + */ + public List getListAdmissionControllers() { + return new ArrayList<>(this.ADMISSION_CONTROLLERS.values()); + } + + /** + * + * @param controllerName name of the admissionController + * @return instance of the AdmissionController Instance + */ + public AdmissionController getAdmissionController(String controllerName) { + if (this.ADMISSION_CONTROLLERS.containsKey(controllerName)) { + return this.ADMISSION_CONTROLLERS.get(controllerName); + } + return null; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControlSettings.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControlSettings.java new file mode 100644 index 0000000000000..4fbbf9d1299fd --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControlSettings.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControlMode; + +/** + * Settings related to admission controller. + * @opensearch.internal + */ +public final class AdmissionControlSettings { + + /** + * Default parameters for the AdmissionControllerSettings + */ + public static class Defaults { + public static final String MODE = "disabled"; + } + + public static final String IO_BASED_ADMISSION_CONTROLLER = "global_io_usage"; + + /** + * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set + * rejection will be performed, otherwise only rejection metrics will be populated. + */ + public static final Setting ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE = new Setting<>( + "admission_controller.transport.mode", + Defaults.MODE, + AdmissionControlMode::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Feature level setting to force enable feature setting for individual controllers. + * Default each admissionController Settings will be considered but once this flag enabled feature level settings + * will be considered. Will help during turn off entire feature when its required + */ + public static final Setting ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING = Setting.boolSetting( + "admission_controller.force_enable_defaults", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private volatile Boolean forceEnableDefault; + + private volatile AdmissionControlMode transportLayeradmissionControlMode; + + private final AdmissionControlService admissionControlService; + + /** + * + * @param clusterSettings clusterSettings Instance + * @param settings settings instance + * @param admissionControlService AdmissionController Instance + */ + public AdmissionControlSettings(ClusterSettings clusterSettings, Settings settings, AdmissionControlService admissionControlService) { + this.transportLayeradmissionControlMode = ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); + this.forceEnableDefault = ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING, this::setForceEnableDefaultSettings); + clusterSettings.addSettingsUpdateConsumer( + ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + this::setAdmissionControllerTransportLayerMode + ); + this.admissionControlService = admissionControlService; + } + + /** + * + * @param admissionControlMode update the mode of admission controller feature + */ + private void setAdmissionControllerTransportLayerMode(AdmissionControlMode admissionControlMode) { + this.transportLayeradmissionControlMode = admissionControlMode; + } + + /** + * + * @return return the default mode of the admissionController + */ + public AdmissionControlMode getAdmissionControllerTransportLayerMode() { + return this.transportLayeradmissionControlMode; + } + + /** + * + * @param forceEnableDefault true when force enabled feature is enabled else false + */ + public void setForceEnableDefaultSettings(Boolean forceEnableDefault) { + this.forceEnableDefault = forceEnableDefault; + } + + /** + * + * @return true when force enabled feature is enabled else false + */ + public Boolean isForceDefaultSettingsEnabled() { + return this.forceEnableDefault; + } + + /** + * + * @return true based on the admission controller feature is enforced else false + */ + public Boolean isTransportLayerAdmissionControllerEnforced() { + return this.transportLayeradmissionControlMode == AdmissionControlMode.ENFORCED; + } + + /** + * + * @return true based on the admission controller feature is enabled else false + */ + public Boolean isTransportLayerAdmissionControllerEnabled() { + return this.transportLayeradmissionControlMode != AdmissionControlMode.DISABLED; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java new file mode 100644 index 0000000000000..dd9997cf416c5 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; + +import org.opensearch.throttling.admissioncontroller.AdmissionControlSettings; + +/** + * Interface for Admission Controller in OpenSearch, which aims to provide resource based request admission control. + * It provides methods for any tracking-object that can be incremented (such as memory size), + * and admission control can be applied if configured limit has been reached + */ +public interface AdmissionController { + + /** + * Return the current state of the admission controller + * @return true if admissionController is enabled for the transport layer else false + */ + boolean isEnabledForTransportLayer(); + + /** + * Increment the tracking-objects and apply the admission control if threshold is breached. + * Mostly applicable while acquiring the quota. + * @return true if admission controller is successfully acquired on the request else false + */ + boolean acquire(AdmissionControlSettings admissionControlSettings); + + /** + * Decrement the tracking-objects and do not apply the admission control. + * Mostly applicable while remitting the quota. + * @return boolean if admission controller is released the acquired quota. + */ + boolean release(); + + /** + * @return name of the admission-controller + */ + String getName(); + + /** + * Adds the rejection count for the controller. Primarily used when copying controller states. + * @param count To add the value of the tracking resource object as the provided count + */ + void addRejectionCount(long count); + + /** + * @return current value of the rejection count metric tracked by the admission-controller. + */ + long getRejectionCount(); +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java new file mode 100644 index 0000000000000..c2c47c5624429 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.throttling.admissioncontroller.AdmissionControlSettings; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControlMode; +import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings; + +/** + * Class for IO Based Admission Controller in OpenSearch, which aims to provide IO utilisation admission control. + * It provides methods to apply admission control if configured limit has been reached + */ +public class IOBasedAdmissionController implements AdmissionController { + private static final Logger LOGGER = LogManager.getLogger(IOBasedAdmissionController.class); + private final String admissionControllerName; + public IOBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings; + private long rejectionCount; + + /** + * + * @param admissionControllerName State of the admission controller + */ + public IOBasedAdmissionController(String admissionControllerName, Settings settings, ClusterSettings clusterSettings) { + this.admissionControllerName = admissionControllerName; + this.ioBasedAdmissionControllerSettings = new IOBasedAdmissionControllerSettings(clusterSettings, settings, this); + this.rejectionCount = 0; + } + + /** + * Return the current state of the admission controller + * + * @return true if admissionController is enabled for the transport layer else false + */ + @Override + public boolean isEnabledForTransportLayer() { + return this.ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode() != AdmissionControlMode.DISABLED; + } + + /** + * This function will take of applying admission controller based on IO usage + * @return true if admission controller is successfully acquired on the request else false + */ + @Override + public boolean acquire(AdmissionControlSettings admissionControlSettings) { + // TODO Will extend this logic further currently just incrementing rejectionCount + boolean isEnabled = admissionControlSettings.isForceDefaultSettingsEnabled() + ? admissionControlSettings.isTransportLayerAdmissionControllerEnabled() + : this.isEnabledForTransportLayer(); + if (isEnabled) { + this.addRejectionCount(1); + } + return true; + } + + /** + * @return true if the admission controller cleared the objects that acquired else false + */ + @Override + public boolean release() { + return false; + } + + /** + * @return name of the admission Controller + */ + @Override + public String getName() { + return this.admissionControllerName; + } + + /** + * Adds the rejection count for the controller. Primarily used when copying controller states. + * + * @param count To add the value of the tracking resource object as the provided count + */ + @Override + public void addRejectionCount(long count) { + this.rejectionCount = this.rejectionCount + count; + } + + /** + * @return rejection count of the admission controller + */ + public long getRejectionCount() { + return rejectionCount; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java new file mode 100644 index 0000000000000..e81431ccca356 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains classes related to the different admission controllers + */ +package org.opensearch.throttling.admissioncontroller.controllers; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/AdmissionControlMode.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/AdmissionControlMode.java new file mode 100644 index 0000000000000..5fefbf1c9e21c --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/AdmissionControlMode.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.enums; + +import java.util.Locale; + +/** + * Defines the AdmissionControlMode + */ +public enum AdmissionControlMode { + /** + * AdmissionController is completely disabled. + */ + DISABLED("disabled"), + + /** + * AdmissionController only monitors the rejection criteria for the requests. + */ + MONITOR("monitor_only"), + + /** + * AdmissionController monitors and rejects tasks that exceed resource usage thresholds. + */ + ENFORCED("enforced"); + + private final String mode; + + /** + * @param mode update mode of the admission controller + */ + AdmissionControlMode(String mode) { + this.mode = mode; + } + + /** + * + * @return mode of the admission controller + */ + public String getMode() { + return this.mode; + } + + /** + * + * @param name is the mode of the current + * @return Enum of AdmissionControlMode based on the mode + */ + public static AdmissionControlMode fromName(String name) { + switch (name.toLowerCase(Locale.ROOT)) { + case "disabled": + return DISABLED; + case "monitor_only": + return MONITOR; + case "enforced": + return ENFORCED; + } + + throw new IllegalArgumentException("Invalid AdmissionControlMode: " + name); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/TransportRequestActionType.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/TransportRequestActionType.java new file mode 100644 index 0000000000000..af4109579a89c --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/TransportRequestActionType.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.enums; + +import java.util.Locale; + +/** + * Enums that defines the type of the transport requests + */ +public enum TransportRequestActionType { + INDEXING_PRIMARY("indexing_primary"), + INDEXING_REPLICA("indexing_replica"), + INDEXING("indexing"), + SEARCH("search"); + + private final String type; + + TransportRequestActionType(String uriType) { + this.type = uriType; + } + + /** + * + * @return type of the request + */ + public String getType() { + return type; + } + + public static TransportRequestActionType fromName(String name) { + name = name.toLowerCase(Locale.ROOT); + switch (name) { + case "indexing_primary": + return INDEXING_PRIMARY; + case "indexing_replica": + return INDEXING_REPLICA; + case "search": + return SEARCH; + case "indexing": + return INDEXING; + } + throw new IllegalArgumentException("Invalid TransportRequestActionType: " + name); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/package-info.java new file mode 100644 index 0000000000000..71c1c8f50de02 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains enums related to the different admission controller feature + */ +package org.opensearch.throttling.admissioncontroller.enums; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java new file mode 100644 index 0000000000000..8dae7b3992b84 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * This package contains base classes needed for the admissionController Feature + */ +package org.opensearch.throttling.admissioncontroller; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettings.java new file mode 100644 index 0000000000000..04958de4e9a07 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettings.java @@ -0,0 +1,94 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.settings; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControlMode; +import org.opensearch.throttling.admissioncontroller.enums.TransportRequestActionType; + +import java.util.Arrays; +import java.util.List; + +/** + * Settings related to io based admission controller. + * @opensearch.internal + */ +public class IOBasedAdmissionControllerSettings { + + /** + * Default parameters for the IOBasedAdmissionControllerSettings + */ + public static class Defaults { + public static final String MODE = "disabled"; + public static List TRANSPORT_LAYER_DEFAULT_URI_TYPE = Arrays.asList("indexing", "search"); + } + + private final IOBasedAdmissionController ioBasedAdmissionController; + private AdmissionControlMode transportLayerMode; + private List transportRequestURIList; + + /** + * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set + * rejection will be performed, otherwise only rejection metrics will be populated. + */ + public static final Setting IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE = new Setting<>( + "admission_controller.transport.global_io_usage.mode", + Defaults.MODE, + AdmissionControlMode::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting> IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST = Setting.listSetting( + "admission_controller.transport.global_io_usage.uri_list", + Defaults.TRANSPORT_LAYER_DEFAULT_URI_TYPE, + TransportRequestActionType::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + // currently limited to one setting will add further more settings in follow-up PR's + public IOBasedAdmissionControllerSettings( + ClusterSettings clusterSettings, + Settings settings, + IOBasedAdmissionController ioBasedAdmissionController + ) { + this.ioBasedAdmissionController = ioBasedAdmissionController; + this.transportLayerMode = IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); + clusterSettings.addSettingsUpdateConsumer(IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); + this.transportRequestURIList = IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST.get(settings); + clusterSettings.addSettingsUpdateConsumer( + IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST, + this::setIoBasedAdmissionControllerTransportUriList + ); + } + + private void setTransportLayerMode(AdmissionControlMode admissionControlMode) { + this.transportLayerMode = admissionControlMode; + } + + private void setIoBasedAdmissionControllerTransportUriList(List transportRequestURIList) { + this.transportRequestURIList = transportRequestURIList; + } + + public IOBasedAdmissionController getIoBasedAdmissionController() { + return ioBasedAdmissionController; + } + + public AdmissionControlMode getTransportLayerAdmissionControllerMode() { + return transportLayerMode; + } + + public List getTransportRequestURIList() { + return transportRequestURIList; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/package-info.java new file mode 100644 index 0000000000000..25c0a8cb93ac7 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/package-info.java @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** + * This package contains settings related classes for the different admission controllers + */ +package org.opensearch.throttling.admissioncontroller.settings; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportHandler.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportHandler.java new file mode 100644 index 0000000000000..2a6b8e32c74f6 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportHandler.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.tasks.Task; +import org.opensearch.throttling.admissioncontroller.AdmissionControlService; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * AdmissionController Handler to intercept Transport Requests. + * @param Transport Request + */ +public class AdmissionControlTransportHandler implements TransportRequestHandler { + + private final String action; + private final TransportRequestHandler actualHandler; + protected final Logger log = LogManager.getLogger(this.getClass()); + AdmissionControlService admissionControlService; + + public AdmissionControlTransportHandler( + String action, + TransportRequestHandler actualHandler, + AdmissionControlService admissionControlService + ) { + super(); + this.action = action; + this.actualHandler = actualHandler; + this.admissionControlService = admissionControlService; + } + + /** + * @param request Transport Request that landed on the node + * @param channel Transport channel allows to send a response to a request + * @param task Current task that is executing + * @throws Exception when admission control rejected the requests + */ + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + // intercept all the transport requests here and apply admission control + try { + this.admissionControlService.applyTransportLayerAdmissionController(this.action); + } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { + channel.sendResponse(openSearchRejectedExecutionException); + throw openSearchRejectedExecutionException; + } catch (final Exception e) { + throw e; + } + this.messageReceivedDecorate(request, actualHandler, channel, task); + } + + /** + * + * @param request Transport Request that landed on the node + * @param actualHandler is the next handler to intercept the request + * @param transportChannel Transport channel allows to send a response to a request + * @param task Current task that is executing + * @throws Exception from the next handler + */ + protected void messageReceivedDecorate( + final T request, + final TransportRequestHandler actualHandler, + final TransportChannel transportChannel, + Task task + ) throws Exception { + actualHandler.messageReceived(request, transportChannel, task); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportInterceptor.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportInterceptor.java new file mode 100644 index 0000000000000..85e6c06df083c --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportInterceptor.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.transport; + +import org.opensearch.throttling.admissioncontroller.AdmissionControlService; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * This class allows throttling to intercept requests on both the sender and the receiver side. + */ +public class AdmissionControlTransportInterceptor implements TransportInterceptor { + + AdmissionControlService admissionControlService; + + public AdmissionControlTransportInterceptor(AdmissionControlService admissionControlService) { + this.admissionControlService = admissionControlService; + } + + /** + * + * @return admissionController handler to intercept transport requests + */ + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + return new AdmissionControlTransportHandler<>(action, actualHandler, this.admissionControlService); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java new file mode 100644 index 0000000000000..78edf330b9afc --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java @@ -0,0 +1,11 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +/** + * This package contains transport related classes for the admissionController Feature + */ +package org.opensearch.throttling.admissioncontroller.transport; diff --git a/server/src/main/java/org/opensearch/throttling/package-info.java b/server/src/main/java/org/opensearch/throttling/package-info.java new file mode 100644 index 0000000000000..051c00a0bb1e8 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base OpenSearch Throttling package + */ +package org.opensearch.throttling; diff --git a/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControlServiceTests.java new file mode 100644 index 0000000000000..27e68a91b6914 --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControlServiceTests.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.controllers.AdmissionController; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControlMode; +import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings; + +import java.util.List; + +public class AdmissionControlServiceTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + private AdmissionControlService admissionControlService; + + private String action = ""; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + action = "indexing"; + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testWhenAdmissionControllerDisabled() { + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + assertFalse(admissionControlService.isTransportLayerAdmissionControllerEnabled()); + assertFalse(admissionControlService.isTransportLayerAdmissionControllerEnforced()); + + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()) + .build(); + + clusterService.getClusterSettings().applySettings(settings); + assertTrue(admissionControlService.isTransportLayerAdmissionControllerEnabled()); + assertFalse(admissionControlService.isTransportLayerAdmissionControllerEnforced()); + + settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode()) + .build(); + + clusterService.getClusterSettings().applySettings(settings); + assertTrue(admissionControlService.isTransportLayerAdmissionControllerEnabled()); + assertTrue(admissionControlService.isTransportLayerAdmissionControllerEnforced()); + } + + public void testWhenAdmissionControllerRegistered() { + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + assertEquals(admissionControlService.getListAdmissionControllers().size(), 1); + } + + public void testApplyAdmissionControllerDisabled() { + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControlService.applyTransportLayerAdmissionController(this.action); + List admissionControllerList = admissionControlService.getListAdmissionControllers(); + admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(), 0); }); + } + + public void testApplyAdmissionControllerEnabled() { + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControlService.applyTransportLayerAdmissionController(this.action); + assertEquals( + admissionControlService.getAdmissionController(AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount(), + 0 + ); + + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + + admissionControlService.applyTransportLayerAdmissionController(this.action); + List admissionControllerList = admissionControlService.getListAdmissionControllers(); + assertEquals(admissionControllerList.size(), 1); + assertEquals( + admissionControlService.getAdmissionController(AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount(), + 1 + ); + } + + public void testWhenForceEnablementDisabled() { + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode()) + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.getKey(), false) + .build(); + clusterService.getClusterSettings().applySettings(settings); + admissionControlService.applyTransportLayerAdmissionController(this.action); + assertEquals( + admissionControlService.getAdmissionController(AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount(), + 1 + ); + } + + public void testWhenForceEnablementEnabled() { + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED.getMode()) + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.getKey(), true) + .build(); + clusterService.getClusterSettings().applySettings(settings); + admissionControlService.applyTransportLayerAdmissionController(this.action); + assertEquals( + admissionControlService.getAdmissionController(AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount(), + 0 + ); + + Settings newSettings = Settings.builder() + .put(settings) + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.getKey(), false) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + admissionControlService.applyTransportLayerAdmissionController(this.action); + assertEquals( + admissionControlService.getAdmissionController(AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER).getRejectionCount(), + 1 + ); + } +} diff --git a/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControlSettingsTests.java b/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControlSettingsTests.java new file mode 100644 index 0000000000000..79c03a3939a7f --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControlSettingsTests.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControlMode; + +import java.util.Arrays; +import java.util.Set; + +import static org.mockito.Mockito.mock; + +public class AdmissionControlSettingsTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testSettingsExists() { + Set> settings = ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; + assertTrue( + "All the admission controller settings should be supported built in settings", + settings.containsAll( + Arrays.asList( + AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + AdmissionControlSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING + ) + ) + ); + } + + public void testDefaultSettings() { + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(AdmissionControlService.class) + ); + + assertFalse(admissionControlSettings.isTransportLayerAdmissionControllerEnabled()); + assertFalse(admissionControlSettings.isTransportLayerAdmissionControllerEnforced()); + assertFalse(admissionControlSettings.isForceDefaultSettingsEnabled()); + } + + public void testGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode()) + .build(); + + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings( + clusterService.getClusterSettings(), + settings, + mock(AdmissionControlService.class) + ); + + assertTrue(admissionControlSettings.isTransportLayerAdmissionControllerEnabled()); + assertTrue(admissionControlSettings.isTransportLayerAdmissionControllerEnforced()); + assertFalse(admissionControlSettings.isForceDefaultSettingsEnabled()); + } + + public void testUpdateAfterGetDefaultSettings() { + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(AdmissionControlService.class) + ); + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()) + .build(); + clusterService.getClusterSettings().applySettings(settings); + assertTrue(admissionControlSettings.isTransportLayerAdmissionControllerEnabled()); + assertFalse(admissionControlSettings.isTransportLayerAdmissionControllerEnforced()); + assertFalse(admissionControlSettings.isForceDefaultSettingsEnabled()); + } + + public void testUpdateAfterGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()) + .build(); + + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings( + clusterService.getClusterSettings(), + settings, + mock(AdmissionControlService.class) + ); + + Settings newSettings = Settings.builder() + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode()) + .build(); + + clusterService.getClusterSettings().applySettings(newSettings); + + assertTrue(admissionControlSettings.isTransportLayerAdmissionControllerEnabled()); + assertTrue(admissionControlSettings.isTransportLayerAdmissionControllerEnforced()); + assertFalse(admissionControlSettings.isForceDefaultSettingsEnabled()); + + newSettings = Settings.builder() + .put(newSettings) + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.getKey(), true) + .build(); + + clusterService.getClusterSettings().applySettings(newSettings); + assertTrue(admissionControlSettings.isForceDefaultSettingsEnabled()); + assertTrue(admissionControlSettings.isTransportLayerAdmissionControllerEnabled()); + assertTrue(admissionControlSettings.isTransportLayerAdmissionControllerEnforced()); + } +} diff --git a/server/src/test/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionControllerTests.java new file mode 100644 index 0000000000000..522615770a79c --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionControllerTests.java @@ -0,0 +1,158 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControlService; +import org.opensearch.throttling.admissioncontroller.AdmissionControlSettings; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControlMode; +import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings; + +import static org.mockito.Mockito.mock; + +public class IOBasedAdmissionControllerTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + IOBasedAdmissionController ioBasedAdmissionController = null; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testCheckDefaultParameters() { + ioBasedAdmissionController = new IOBasedAdmissionController( + AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER, + Settings.EMPTY, + clusterService.getClusterSettings() + ); + assertEquals(ioBasedAdmissionController.getName(), AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + assertEquals( + ioBasedAdmissionController.ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), + AdmissionControlMode.DISABLED + ); + assertFalse(ioBasedAdmissionController.isEnabledForTransportLayer()); + } + + public void testCheckUpdateSettings() { + ioBasedAdmissionController = new IOBasedAdmissionController( + AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER, + Settings.EMPTY, + clusterService.getClusterSettings() + ); + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + + assertEquals(ioBasedAdmissionController.getName(), AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + assertEquals( + ioBasedAdmissionController.ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), + AdmissionControlMode.ENFORCED + ); + assertTrue(ioBasedAdmissionController.isEnabledForTransportLayer()); + } + + public void testApplyControllerWithDefaultSettings() { + ioBasedAdmissionController = new IOBasedAdmissionController( + AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER, + Settings.EMPTY, + clusterService.getClusterSettings() + ); + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(AdmissionControlService.class) + ); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + assertEquals( + ioBasedAdmissionController.ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), + AdmissionControlMode.DISABLED + ); + ioBasedAdmissionController.acquire(admissionControlSettings); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + } + + public void testApplyControllerWhenSettingsEnabled() { + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + ioBasedAdmissionController = new IOBasedAdmissionController( + AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER, + settings, + clusterService.getClusterSettings() + ); + assertTrue(ioBasedAdmissionController.isEnabledForTransportLayer()); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(AdmissionControlService.class) + ); + ioBasedAdmissionController.acquire(admissionControlSettings); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 1); + } + + public void testApplyControllerWhenForceDefaultEnabled() { + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .build(); + ioBasedAdmissionController = new IOBasedAdmissionController( + AdmissionControlSettings.IO_BASED_ADMISSION_CONTROLLER, + settings, + clusterService.getClusterSettings() + ); + assertTrue(ioBasedAdmissionController.isEnabledForTransportLayer()); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + AdmissionControlSettings admissionControlSettings = new AdmissionControlSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(AdmissionControlService.class) + ); + ioBasedAdmissionController.acquire(admissionControlSettings); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 1); + Settings newSettings = Settings.builder() + .put(settings) + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.getKey(), true) + .put(AdmissionControlSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + assertTrue(ioBasedAdmissionController.isEnabledForTransportLayer()); + ioBasedAdmissionController.acquire(admissionControlSettings); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 1); + } +} diff --git a/server/src/test/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControlSettingsTests.java b/server/src/test/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControlSettingsTests.java new file mode 100644 index 0000000000000..3da8b6fed555f --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControlSettingsTests.java @@ -0,0 +1,152 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.settings; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControlMode; +import org.opensearch.throttling.admissioncontroller.enums.TransportRequestActionType; + +import java.util.Arrays; +import java.util.Set; + +import static org.mockito.Mockito.mock; + +public class IOBasedAdmissionControlSettingsTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testSettingsExists() { + Set> settings = ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; + assertTrue( + "All the io based admission controller settings should be supported built in settings", + settings.containsAll( + Arrays.asList( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST + ) + ) + ); + } + + public void testDefaultSettings() { + IOBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IOBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(IOBasedAdmissionController.class) + ); + assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); + assertEquals( + ioBasedAdmissionControllerSettings.getTransportRequestURIList(), + Arrays.asList(TransportRequestActionType.INDEXING, TransportRequestActionType.SEARCH) + ); + } + + public void testGetConfiguredSettings() { + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .putList( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST.getKey(), + Arrays.asList("indexing_primary", "search") + ) + .build(); + + IOBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IOBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + settings, + mock(IOBasedAdmissionController.class) + ); + assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertEquals( + ioBasedAdmissionControllerSettings.getTransportRequestURIList(), + Arrays.asList(TransportRequestActionType.INDEXING_PRIMARY, TransportRequestActionType.SEARCH) + ); + } + + public void testUpdateAfterGetDefaultSettings() { + IOBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IOBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(IOBasedAdmissionController.class) + ); + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .putList( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST.getKey(), + Arrays.asList("indexing_replica", "search") + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + assertEquals( + ioBasedAdmissionControllerSettings.getTransportRequestURIList(), + Arrays.asList(TransportRequestActionType.INDEXING_REPLICA, TransportRequestActionType.SEARCH) + ); + } + + public void testUpdateAfterGetConfiguredSettings() { + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.ENFORCED.getMode() + ) + .putList( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST.getKey(), + Arrays.asList("indexing_replica", "search") + ) + .build(); + + IOBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IOBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + settings, + mock(IOBasedAdmissionController.class) + ); + assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); + Settings newSettings = Settings.builder() + .put(settings) + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .putList(IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST.getKey(), Arrays.asList("search")) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.MONITOR); + assertEquals(ioBasedAdmissionControllerSettings.getTransportRequestURIList(), Arrays.asList(TransportRequestActionType.SEARCH)); + } +} diff --git a/server/src/test/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportHandlerTests.java b/server/src/test/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportHandlerTests.java new file mode 100644 index 0000000000000..ef26893acc8cb --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControlTransportHandlerTests.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.transport; + +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.throttling.admissioncontroller.AdmissionControlService; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AdmissionControlTransportHandlerTests extends OpenSearchTestCase { + AdmissionControlTransportHandler admissionControlTransportHandler; + private String testAction = ""; + + public void testHandlerInvoked() throws Exception { + String action = "TEST"; + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControlTransportHandler = new AdmissionControlTransportHandler( + action, + handler, + mock(AdmissionControlService.class) + ); + admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + assertEquals(1, handler.count); + } + + public void testHandlerInvokedRejectedException() throws Exception { + String action = "TEST"; + AdmissionControlService admissionControlService = mock(AdmissionControlService.class); + when(admissionControlService.applyTransportLayerAdmissionController(testAction)).thenThrow( + new OpenSearchRejectedExecutionException() + ); + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControlTransportHandler = new AdmissionControlTransportHandler(action, handler, admissionControlService); + try { + admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + } catch (OpenSearchRejectedExecutionException exception) { + assertEquals(0, handler.count); + handler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + } + assertEquals(1, handler.count); + } + + public void testHandlerInvokedRandomException() throws Exception { + String action = "TEST"; + AdmissionControlService admissionControlService = mock(AdmissionControlService.class); + when(admissionControlService.applyTransportLayerAdmissionController(testAction)).thenThrow(new NullPointerException()); + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControlTransportHandler = new AdmissionControlTransportHandler(action, handler, admissionControlService); + try { + admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + } catch (Exception exception) { + assertEquals(0, handler.count); + handler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + } + assertEquals(1, handler.count); + } + + private class InterceptingRequestHandler implements TransportRequestHandler { + private final String action; + public int count; + + public InterceptingRequestHandler(String action) { + this.action = action; + this.count = 0; + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + this.count = this.count + 1; + } + } +} diff --git a/settings.gradle b/settings.gradle index c04b5997d49b1..009c8b732fdd5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -153,3 +153,6 @@ if (extraProjects.exists()) { addSubProjects('', extraProjectDir) } } +include 'modules:throttling' +findProject(':modules:throttling')?.name = 'throttling' +