diff --git a/CHANGELOG.md b/CHANGELOG.md index 529278c188f46..93a80877d7a1e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107)) - Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679)) - Added encryption-sdk lib to provide encryption and decryption capabilities ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466)) +- Admission Controller Module Transport Interceptor Initial Commit ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/modules/throttling/build.gradle b/modules/throttling/build.gradle new file mode 100644 index 0000000000000..40fef23c137e4 --- /dev/null +++ b/modules/throttling/build.gradle @@ -0,0 +1,14 @@ +apply plugin: 'opensearch.java-rest-test' + +opensearchplugin { + description 'Plugin intercepting requests and throttle based on resource consumption' + classname 'org.opensearch.throttling.OpenSearchThrottlingModulePlugin' +} + +dependencies { + api project(path: ':modules:reindex') +} + +testClusters.all { + module ':modules:reindex' +} diff --git a/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java b/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java new file mode 100644 index 0000000000000..953d65b758aee --- /dev/null +++ b/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling; + +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; +import org.opensearch.throttling.admissioncontroller.transport.AdmissionControllerTransportInterceptor; +import org.opensearch.transport.TransportInterceptor; + +import java.util.ArrayList; +import java.util.List; + +/** + * This plugin is used to register handlers to intercept both rest and transport requests. + */ +public class OpenSearchThrottlingModulePlugin extends Plugin implements NetworkPlugin { + + AdmissionControllerService admissionControllerService; + + /** + * Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing + * transport (inter-node) requests. This must not return null + * + * @param namedWriteableRegistry registry of all named writeables registered + * @param threadContext a {@link ThreadContext} of the current nodes or clients that can be used to set additional + * headers in the interceptors + * @return list of transport interceptors + */ + @Override + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { + this.admissionControllerService = AdmissionControllerService.getInstance(); + List interceptors = new ArrayList<>(0); + // TODO Will throw exception in next PR's. This needs to ensure the service is up before adding into transport interceptor. + if (this.admissionControllerService != null) { + interceptors.add(new AdmissionControllerTransportInterceptor(this.admissionControllerService)); + } + return interceptors; + } +} diff --git a/modules/throttling/src/main/plugin-metadata/plugin-security.policy b/modules/throttling/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 0000000000000..ccfd6ba70dd16 --- /dev/null +++ b/modules/throttling/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +grant { + // needed to generate runtime classes + permission java.lang.RuntimePermission "createClassLoader"; + + // needed to find the classloader to load allowlisted classes from + permission java.lang.RuntimePermission "getClassLoader"; +}; diff --git a/modules/throttling/src/test/java/org/opensearch/throttling/OpenSearchThrottlingModulePluginTest.java b/modules/throttling/src/test/java/org/opensearch/throttling/OpenSearchThrottlingModulePluginTest.java new file mode 100644 index 0000000000000..775f174622f9e --- /dev/null +++ b/modules/throttling/src/test/java/org/opensearch/throttling/OpenSearchThrottlingModulePluginTest.java @@ -0,0 +1,67 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; +import org.opensearch.throttling.admissioncontroller.transport.AdmissionControllerTransportInterceptor; +import org.opensearch.transport.TransportInterceptor; + +import java.util.List; + +import static org.mockito.Mockito.mock; + +public class OpenSearchThrottlingModulePluginTest extends OpenSearchTestCase { + OpenSearchThrottlingModulePlugin openSearchThrottlingModulePlugin; + private ClusterService clusterService; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + super.setUp(); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testGetTransportInterceptors() { + openSearchThrottlingModulePlugin = new OpenSearchThrottlingModulePlugin(); + List interceptors = openSearchThrottlingModulePlugin.getTransportInterceptors( + mock(NamedWriteableRegistry.class), + threadPool.getThreadContext() + ); + assertEquals(interceptors.size(), 0); + AdmissionControllerService admissionControllerService = AdmissionControllerService.newAdmissionControllerService( + Settings.EMPTY, + clusterService.getClusterSettings(), + threadPool + ); + interceptors = openSearchThrottlingModulePlugin.getTransportInterceptors( + mock(NamedWriteableRegistry.class), + threadPool.getThreadContext() + ); + assertEquals(interceptors.size(), 1); + assertEquals(interceptors.get(0).getClass(), AdmissionControllerTransportInterceptor.class); + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 05938914b019f..dab8541c82071 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -151,6 +151,8 @@ import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings; +import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings; import org.opensearch.transport.ProxyConnectionStrategy; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.RemoteConnectionStrategy; @@ -237,6 +239,10 @@ public void apply(Settings value, Settings current, Settings previous) { public static Set> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet( new HashSet<>( Arrays.asList( + AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + AdmissionControllerSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING, + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 51cc7c9007159..af6b2927c383b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -222,6 +222,7 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportInterceptor; @@ -749,6 +750,12 @@ protected Node( fileCacheCleaner ); + final AdmissionControllerService admissionControllerService = AdmissionControllerService.newAdmissionControllerService( + settings, + clusterService.getClusterSettings(), + threadPool + ); + final AliasValidator aliasValidator = new AliasValidator(); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices); @@ -1094,6 +1101,7 @@ protected Node( b.bind(IndexingPressureService.class).toInstance(indexingPressureService); b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService); b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService); + b.bind(AdmissionControllerService.class).toInstance(admissionControllerService); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerService.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerService.java new file mode 100644 index 0000000000000..6fab9354a39ff --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerService.java @@ -0,0 +1,153 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.controllers.AdmissionController; +import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER; + +/** + * Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch. + */ +public class AdmissionControllerService { + private final ThreadPool threadPool; + public final AdmissionControllerSettings admissionControllerSettings; + private final ConcurrentMap ADMISSION_CONTROLLERS; + private static AdmissionControllerService admissionControllerService = null; + private static final Logger logger = LogManager.getLogger(AdmissionControllerService.class); + private final ClusterSettings clusterSettings; + private final Settings settings; + + /** + * + * @param settings Immutable settings instance + * @param clusterSettings ClusterSettings Instance + * @param threadPool ThreadPool Instance + */ + public AdmissionControllerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + this.threadPool = threadPool; + this.admissionControllerSettings = new AdmissionControllerSettings(clusterSettings, settings, this); + this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>(); + this.clusterSettings = clusterSettings; + this.settings = settings; + this.initialise(); + } + + /** + * Initialise and Register all the admissionControllers + */ + public void initialise() { + // Initialise different type of admission controllers + registerAdmissionController(IO_BASED_ADMISSION_CONTROLLER); + } + + /** + * Handler to trigger registered admissionController + */ + public boolean applyTransportLayerAdmissionController(String action) { + this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.acquire(admissionControllerSettings); }); + return true; + } + + /** + * + * @return singleton admissionControllerService Instance + */ + public static AdmissionControllerService getInstance() { + return admissionControllerService; + } + + /** + * + * @param settings Immutable settings instance + * @param clusterSettings ClusterSettings Instance + * @param threadPool ThreadPool Instance + * @return singleton admissionControllerService Instance + */ + public static synchronized AdmissionControllerService newAdmissionControllerService( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool + ) { + if (admissionControllerService == null) { + admissionControllerService = new AdmissionControllerService(settings, clusterSettings, threadPool); + } + return admissionControllerService; + } + + /** + * + * @return true if the admissionController Feature is enabled + */ + public Boolean isTransportLayerAdmissionControllerEnabled() { + return this.admissionControllerSettings.isTransportLayerAdmissionControllerEnabled(); + } + + /** + * + * @return true if the admissionController Feature is enabled + */ + public Boolean isTransportLayerAdmissionControllerEnforced() { + return this.admissionControllerSettings.isTransportLayerAdmissionControllerEnforced(); + } + + /** + * + * @param admissionControllerName admissionControllerName to register into the service. + */ + public void registerAdmissionController(String admissionControllerName) { + AdmissionController admissionController = this.controllerFactory(admissionControllerName); + if (admissionController != null) { + this.ADMISSION_CONTROLLERS.put(admissionControllerName, admissionController); + } + } + + /** + * @return AdmissionController Instance + */ + private AdmissionController controllerFactory(String admissionControllerName) { + switch (admissionControllerName) { + case IO_BASED_ADMISSION_CONTROLLER: + return new IOBasedAdmissionController(admissionControllerName, this.settings, this.clusterSettings); + default: + return null; + } + } + + /** + * + * @return list of the registered admissionControllers + */ + public List getListAdmissionControllers() { + return new ArrayList<>(this.ADMISSION_CONTROLLERS.values()); + } + + /** + * + * @param controllerName name of the admissionController + * @return instance of the AdmissionController Instance + */ + public AdmissionController getAdmissionController(String controllerName) { + if (this.ADMISSION_CONTROLLERS.containsKey(controllerName)) { + return this.ADMISSION_CONTROLLERS.get(controllerName); + } + return null; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettings.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettings.java new file mode 100644 index 0000000000000..ccef69a5b9087 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettings.java @@ -0,0 +1,127 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControllerMode; + +/** + * Settings related to admission controller. + * @opensearch.internal + */ +public final class AdmissionControllerSettings { + + public static class Defaults { + public static final String MODE = "disabled"; + } + + public static final String IO_BASED_ADMISSION_CONTROLLER = "global_io_usage"; + + /** + * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set + * rejection will be performed, otherwise only rejection metrics will be populated. + */ + + public static final Setting ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE = new Setting<>( + "admission_controller.transport.mode", + Defaults.MODE, + AdmissionControllerMode::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Feature level setting to force enable feature setting for individual controllers. + * Default each admissionController Settings will be considered but once this flag enabled feature level settings + * will be considered. Will help during turn off entire feature when its required + */ + public static final Setting ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING = Setting.boolSetting( + "admission_controller.force_enable_defaults", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private volatile Boolean forceEnableDefault; + + private volatile AdmissionControllerMode transportLayeradmissionControllerMode; + + private final AdmissionControllerService admissionControllerService; + + /** + * + * @param clusterSettings clusterSettings Instance + * @param settings settings instance + * @param admissionControllerService AdmissionController Instance + */ + public AdmissionControllerSettings( + ClusterSettings clusterSettings, + Settings settings, + AdmissionControllerService admissionControllerService + ) { + this.transportLayeradmissionControllerMode = ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); + this.forceEnableDefault = ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING, this::setForceEnableDefaultSettings); + clusterSettings.addSettingsUpdateConsumer( + ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + this::setAdmissionControllerTransportLayerMode + ); + this.admissionControllerService = admissionControllerService; + } + + /** + * + * @param admissionControllerMode update the mode of admission controller feature + */ + private void setAdmissionControllerTransportLayerMode(AdmissionControllerMode admissionControllerMode) { + this.transportLayeradmissionControllerMode = admissionControllerMode; + } + + /** + * + * @return return the default mode of the admissionController + */ + public AdmissionControllerMode getAdmissionControllerTransportLayerMode() { + return this.transportLayeradmissionControllerMode; + } + + /** + * + * @param forceEnableDefault true when force enabled feature is enabled else false + */ + public void setForceEnableDefaultSettings(Boolean forceEnableDefault) { + this.forceEnableDefault = forceEnableDefault; + } + + /** + * + * @return true when force enabled feature is enabled else false + */ + public Boolean isForceDefaultSettingsEnabled() { + return this.forceEnableDefault; + } + + /** + * + * @return true based on the admission controller feature is enforced else false + */ + public Boolean isTransportLayerAdmissionControllerEnforced() { + return this.transportLayeradmissionControllerMode == AdmissionControllerMode.ENFORCED; + } + + /** + * + * @return true based on the admission controller feature is enabled else false + */ + public Boolean isTransportLayerAdmissionControllerEnabled() { + return this.transportLayeradmissionControllerMode != AdmissionControllerMode.DISABLED; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java new file mode 100644 index 0000000000000..7cdd2cc110c91 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; + +import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings; + +/** + * Interface for Admission Controller in OpenSearch, which aims to provide resource based request admission control. + * It provides methods for any tracking-object that can be incremented (such as memory size), + * and admission control can be applied if configured limit has been reached + */ +public interface AdmissionController { + + /** + * Return the current state of the admission controller + * @return true if admissionController is enabled for the transport layer else false + */ + boolean isEnabledForTransportLayer(); + + /** + * Increment the tracking-objects and apply the admission control if threshold is breached. + * Mostly applicable while acquiring the quota. + * @return true if admission controller is successfully acquired on the request else false + */ + boolean acquire(AdmissionControllerSettings admissionControllerSettings); + + /** + * Decrement the tracking-objects and do not apply the admission control. + * Mostly applicable while remitting the quota. + * @return boolean if admission controller is released the acquired quota. + */ + boolean release(); + + /** + * @return name of the admission-controller + */ + String getName(); + + /** + * Adds the rejection count for the controller. Primarily used when copying controller states. + * @param count To add the value of the tracking resource object as the provided count + */ + void addRejectionCount(long count); + + /** + * @return current value of the rejection count metric tracked by the admission-controller. + */ + long getRejectionCount(); +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java new file mode 100644 index 0000000000000..f5d1ec56977de --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java @@ -0,0 +1,97 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControllerMode; +import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings; + +/** + * Class for IO Based Admission Controller in OpenSearch, which aims to provide IO utilisation admission control. + * It provides methods to apply admission control if configured limit has been reached + */ +public class IOBasedAdmissionController implements AdmissionController { + private static final Logger LOGGER = LogManager.getLogger(IOBasedAdmissionController.class); + private final String admissionControllerName; + public IOBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings; + private long rejectionCount; + + /** + * + * @param admissionControllerName State of the admission controller + */ + public IOBasedAdmissionController(String admissionControllerName, Settings settings, ClusterSettings clusterSettings) { + this.admissionControllerName = admissionControllerName; + this.ioBasedAdmissionControllerSettings = new IOBasedAdmissionControllerSettings(clusterSettings, settings, this); + this.rejectionCount = 0; + } + + /** + * Return the current state of the admission controller + * + * @return true if admissionController is enabled for the transport layer else false + */ + @Override + public boolean isEnabledForTransportLayer() { + return this.ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode() != AdmissionControllerMode.DISABLED; + } + + /** + * This function will take of applying admission controller based on IO usage + * @return true if admission controller is successfully acquired on the request else false + */ + @Override + public boolean acquire(AdmissionControllerSettings admissionControllerSettings) { + // TODO Will extend this logic further currently just incrementing rejectionCount + boolean isEnabled = admissionControllerSettings.isForceDefaultSettingsEnabled() + ? admissionControllerSettings.isTransportLayerAdmissionControllerEnabled() + : this.isEnabledForTransportLayer(); + if (isEnabled) { + this.addRejectionCount(1); + } + return true; + } + + /** + * @return true if the admission controller cleared the objects that acquired else false + */ + @Override + public boolean release() { + return false; + } + + /** + * @return name of the admission Controller + */ + @Override + public String getName() { + return this.admissionControllerName; + } + + /** + * Adds the rejection count for the controller. Primarily used when copying controller states. + * + * @param count To add the value of the tracking resource object as the provided count + */ + @Override + public void addRejectionCount(long count) { + this.rejectionCount = this.rejectionCount + count; + } + + /** + * @return rejection count of the admission controller + */ + public long getRejectionCount() { + return rejectionCount; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java new file mode 100644 index 0000000000000..4b9c1b6f3864a --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/AdmissionControllerMode.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/AdmissionControllerMode.java new file mode 100644 index 0000000000000..517e4ed30e579 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/AdmissionControllerMode.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.enums; + +import java.util.Locale; + +/** + * Defines the AdmissionControllerMode + */ +public enum AdmissionControllerMode { + /** + * AdmissionController is completely disabled. + */ + DISABLED("disabled"), + + /** + * AdmissionController only monitors the rejection criteria for the requests. + */ + MONITOR("monitor_only"), + + /** + * AdmissionController monitors and rejects tasks that exceed resource usage thresholds. + */ + ENFORCED("enforced"); + + private final String mode; + + /** + * @param mode update mode of the admission controller + */ + AdmissionControllerMode(String mode) { + this.mode = mode; + } + + /** + * + * @return mode of the admission controller + */ + public String getMode() { + return this.mode; + } + + /** + * + * @param name is the mode of the current + * @return Enum of AdmissionControllerMode based on the mode + */ + public static AdmissionControllerMode fromName(String name) { + switch (name.toLowerCase(Locale.ROOT)) { + case "disabled": + return DISABLED; + case "monitor_only": + return MONITOR; + case "enforced": + return ENFORCED; + } + + throw new IllegalArgumentException("Invalid AdmissionControllerMode: " + name); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/TransportRequestURIType.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/TransportRequestURIType.java new file mode 100644 index 0000000000000..5b00288cf923d --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/TransportRequestURIType.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.enums; + +import java.util.Locale; + +public enum TransportRequestURIType { + INDEXING_PRIMARY("indexing_primary"), + INDEXING_REPLICA("indexing_replica"), + INDEXING("indexing"), + SEARCH("search"); + + private final String type; + + TransportRequestURIType(String uriType) { + this.type = uriType; + } + + /** + * + * @return type of the request + */ + public String getType() { + return type; + } + + public static TransportRequestURIType fromName(String name) { + name = name.toLowerCase(Locale.ROOT); + switch (name) { + case "indexing_primary": + return INDEXING_PRIMARY; + case "indexing_replica": + return INDEXING_REPLICA; + case "search": + return SEARCH; + case "indexing": + return INDEXING; + } + throw new IllegalArgumentException("Invalid TransportRequestURIType: " + name); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java new file mode 100644 index 0000000000000..a6abeb832a38a --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettings.java new file mode 100644 index 0000000000000..70ec08c66bce8 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettings.java @@ -0,0 +1,91 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.settings; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControllerMode; +import org.opensearch.throttling.admissioncontroller.enums.TransportRequestURIType; + +import java.util.Arrays; +import java.util.List; + +/** + * Settings related to io based admission controller. + * @opensearch.internal + */ +public class IOBasedAdmissionControllerSettings { + + public static class Defaults { + public static final String MODE = "disabled"; + public static List TRANSPORT_LAYER_DEFAULT_URI_TYPE = Arrays.asList("indexing", "search"); + } + + private final IOBasedAdmissionController ioBasedAdmissionController; + private AdmissionControllerMode transportLayerMode; + private List transportRequestURIList; + + /** + * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set + * rejection will be performed, otherwise only rejection metrics will be populated. + */ + public static final Setting IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE = new Setting<>( + "admission_controller.transport.global_io_usage.mode", + Defaults.MODE, + AdmissionControllerMode::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting> IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST = Setting.listSetting( + "admission_controller.transport.global_io_usage.uri_list", + Defaults.TRANSPORT_LAYER_DEFAULT_URI_TYPE, + TransportRequestURIType::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + // currently limited to one setting will add further more settings in follow-up PR's + public IOBasedAdmissionControllerSettings( + ClusterSettings clusterSettings, + Settings settings, + IOBasedAdmissionController ioBasedAdmissionController + ) { + this.ioBasedAdmissionController = ioBasedAdmissionController; + this.transportLayerMode = IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); + clusterSettings.addSettingsUpdateConsumer(IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); + this.transportRequestURIList = IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST.get(settings); + clusterSettings.addSettingsUpdateConsumer( + IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST, + this::setIoBasedAdmissionControllerTransportUriList + ); + } + + private void setTransportLayerMode(AdmissionControllerMode admissionControllerMode) { + this.transportLayerMode = admissionControllerMode; + } + + private void setIoBasedAdmissionControllerTransportUriList(List transportRequestURIList) { + this.transportRequestURIList = transportRequestURIList; + } + + public IOBasedAdmissionController getIoBasedAdmissionController() { + return ioBasedAdmissionController; + } + + public AdmissionControllerMode getTransportLayerAdmissionControllerMode() { + return transportLayerMode; + } + + public List getTransportRequestURIList() { + return transportRequestURIList; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportHandler.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportHandler.java new file mode 100644 index 0000000000000..b2407f3efff02 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportHandler.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.transport; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.tasks.Task; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * AdmissionController Handler to intercept Transport Requests. + * @param Transport Request + */ +public class AdmissionControllerTransportHandler implements TransportRequestHandler { + + private final String action; + private final TransportRequestHandler actualHandler; + protected final Logger log = LogManager.getLogger(this.getClass()); + AdmissionControllerService admissionControllerService; + + public AdmissionControllerTransportHandler( + String action, + TransportRequestHandler actualHandler, + AdmissionControllerService admissionControllerService + ) { + super(); + this.action = action; + this.actualHandler = actualHandler; + this.admissionControllerService = admissionControllerService; + } + + /** + * @param request Transport Request that landed on the node + * @param channel Transport channel allows to send a response to a request + * @param task Current task that is executing + * @throws Exception when admission control rejected the requests + */ + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + // intercept all the transport requests here and apply admission control + try { + this.admissionControllerService.applyTransportLayerAdmissionController(this.action); + } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { + channel.sendResponse(openSearchRejectedExecutionException); + throw openSearchRejectedExecutionException; + } catch (final Exception e) { + throw e; + } + this.messageReceivedDecorate(request, actualHandler, channel, task); + } + + /** + * + * @param request Transport Request that landed on the node + * @param actualHandler is the next handler to intercept the request + * @param transportChannel Transport channel allows to send a response to a request + * @param task Current task that is executing + * @throws Exception + */ + protected void messageReceivedDecorate( + final T request, + final TransportRequestHandler actualHandler, + final TransportChannel transportChannel, + Task task + ) throws Exception { + actualHandler.messageReceived(request, transportChannel, task); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportInterceptor.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportInterceptor.java new file mode 100644 index 0000000000000..7d079bcb86e1a --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportInterceptor.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.transport; + +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * This class allows throttling to intercept requests on both the sender and the receiver side. + */ +public class AdmissionControllerTransportInterceptor implements TransportInterceptor { + + AdmissionControllerService admissionControllerService; + + public AdmissionControllerTransportInterceptor(AdmissionControllerService admissionControllerService) { + this.admissionControllerService = admissionControllerService; + } + + /** + * + * @return admissionController handler to intercept transport requests + */ + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + return new AdmissionControllerTransportHandler<>(action, actualHandler, this.admissionControllerService); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java new file mode 100644 index 0000000000000..733e9647b71e7 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.transport; diff --git a/server/src/main/java/org/opensearch/throttling/package-info.java b/server/src/main/java/org/opensearch/throttling/package-info.java new file mode 100644 index 0000000000000..85a43dc86789d --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling; diff --git a/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerServiceTests.java b/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerServiceTests.java new file mode 100644 index 0000000000000..386465779052b --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerServiceTests.java @@ -0,0 +1,158 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.controllers.AdmissionController; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControllerMode; +import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings; + +import java.util.List; + +public class AdmissionControllerServiceTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + private AdmissionControllerService admissionControllerService; + + private String action = ""; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + action = "indexing"; + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testWhenAdmissionControllerDisabled() { + admissionControllerService = new AdmissionControllerService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + assertFalse(admissionControllerService.isTransportLayerAdmissionControllerEnabled()); + assertFalse(admissionControllerService.isTransportLayerAdmissionControllerEnforced()); + + Settings settings = Settings.builder() + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControllerMode.MONITOR.getMode()) + .build(); + + clusterService.getClusterSettings().applySettings(settings); + assertTrue(admissionControllerService.isTransportLayerAdmissionControllerEnabled()); + assertFalse(admissionControllerService.isTransportLayerAdmissionControllerEnforced()); + + settings = Settings.builder() + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControllerMode.ENFORCED.getMode()) + .build(); + + clusterService.getClusterSettings().applySettings(settings); + assertTrue(admissionControllerService.isTransportLayerAdmissionControllerEnabled()); + assertTrue(admissionControllerService.isTransportLayerAdmissionControllerEnforced()); + } + + public void testWhenAdmissionControllerRegistered() { + admissionControllerService = new AdmissionControllerService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + assertEquals(admissionControllerService.getListAdmissionControllers().size(), 1); + } + + public void testApplyAdmissionControllerDisabled() { + admissionControllerService = new AdmissionControllerService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControllerService.applyTransportLayerAdmissionController(this.action); + List admissionControllerList = admissionControllerService.getListAdmissionControllers(); + admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(), 0); }); + } + + public void testApplyAdmissionControllerEnabled() { + admissionControllerService = new AdmissionControllerService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControllerService.applyTransportLayerAdmissionController(this.action); + assertEquals( + admissionControllerService.getAdmissionController(AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER) + .getRejectionCount(), + 0 + ); + + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControllerMode.MONITOR.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + + admissionControllerService.applyTransportLayerAdmissionController(this.action); + List admissionControllerList = admissionControllerService.getListAdmissionControllers(); + assertEquals(admissionControllerList.size(), 1); + assertEquals( + admissionControllerService.getAdmissionController(AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER) + .getRejectionCount(), + 1 + ); + } + + public void testWhenForceEnablementDisabled() { + admissionControllerService = new AdmissionControllerService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControllerMode.MONITOR.getMode() + ) + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControllerMode.DISABLED.getMode()) + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.getKey(), false) + .build(); + clusterService.getClusterSettings().applySettings(settings); + admissionControllerService.applyTransportLayerAdmissionController(this.action); + assertEquals( + admissionControllerService.getAdmissionController(AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER) + .getRejectionCount(), + 1 + ); + } + + public void testWhenForceEnablementEnabled() { + admissionControllerService = new AdmissionControllerService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControllerMode.MONITOR.getMode() + ) + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControllerMode.DISABLED.getMode()) + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.getKey(), true) + .build(); + clusterService.getClusterSettings().applySettings(settings); + admissionControllerService.applyTransportLayerAdmissionController(this.action); + assertEquals( + admissionControllerService.getAdmissionController(AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER) + .getRejectionCount(), + 0 + ); + + Settings newSettings = Settings.builder() + .put(settings) + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.getKey(), false) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + admissionControllerService.applyTransportLayerAdmissionController(this.action); + assertEquals( + admissionControllerService.getAdmissionController(AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER) + .getRejectionCount(), + 1 + ); + } +} diff --git a/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettingsTests.java b/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettingsTests.java new file mode 100644 index 0000000000000..bc98c22684f84 --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettingsTests.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControllerMode; + +import java.util.Arrays; +import java.util.Set; + +import static org.mockito.Mockito.mock; + +public class AdmissionControllerSettingsTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testSettingsExists() { + Set> settings = ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; + assertTrue( + "All the admission controller settings should be supported built in settings", + settings.containsAll( + Arrays.asList( + AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + AdmissionControllerSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING + ) + ) + ); + } + + public void testDefaultSettings() { + AdmissionControllerSettings admissionControllerSettings = new AdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(AdmissionControllerService.class) + ); + + assertFalse(admissionControllerSettings.isTransportLayerAdmissionControllerEnabled()); + assertFalse(admissionControllerSettings.isTransportLayerAdmissionControllerEnforced()); + assertFalse(admissionControllerSettings.isForceDefaultSettingsEnabled()); + } + + public void testGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControllerMode.ENFORCED.getMode()) + .build(); + + AdmissionControllerSettings admissionControllerSettings = new AdmissionControllerSettings( + clusterService.getClusterSettings(), + settings, + mock(AdmissionControllerService.class) + ); + + assertTrue(admissionControllerSettings.isTransportLayerAdmissionControllerEnabled()); + assertTrue(admissionControllerSettings.isTransportLayerAdmissionControllerEnforced()); + assertFalse(admissionControllerSettings.isForceDefaultSettingsEnabled()); + } + + public void testUpdateAfterGetDefaultSettings() { + AdmissionControllerSettings admissionControllerSettings = new AdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(AdmissionControllerService.class) + ); + Settings settings = Settings.builder() + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControllerMode.MONITOR.getMode()) + .build(); + clusterService.getClusterSettings().applySettings(settings); + assertTrue(admissionControllerSettings.isTransportLayerAdmissionControllerEnabled()); + assertFalse(admissionControllerSettings.isTransportLayerAdmissionControllerEnforced()); + assertFalse(admissionControllerSettings.isForceDefaultSettingsEnabled()); + } + + public void testUpdateAfterGetConfiguredSettings() { + Settings settings = Settings.builder() + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControllerMode.MONITOR.getMode()) + .build(); + + AdmissionControllerSettings admissionControllerSettings = new AdmissionControllerSettings( + clusterService.getClusterSettings(), + settings, + mock(AdmissionControllerService.class) + ); + + Settings newSettings = Settings.builder() + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControllerMode.ENFORCED.getMode()) + .build(); + + clusterService.getClusterSettings().applySettings(newSettings); + + assertTrue(admissionControllerSettings.isTransportLayerAdmissionControllerEnabled()); + assertTrue(admissionControllerSettings.isTransportLayerAdmissionControllerEnforced()); + assertFalse(admissionControllerSettings.isForceDefaultSettingsEnabled()); + + newSettings = Settings.builder() + .put(newSettings) + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.getKey(), true) + .build(); + + clusterService.getClusterSettings().applySettings(newSettings); + assertTrue(admissionControllerSettings.isForceDefaultSettingsEnabled()); + assertTrue(admissionControllerSettings.isTransportLayerAdmissionControllerEnabled()); + assertTrue(admissionControllerSettings.isTransportLayerAdmissionControllerEnforced()); + } +} diff --git a/server/src/test/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionControllerTests.java new file mode 100644 index 0000000000000..a945bba0c7c2d --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionControllerTests.java @@ -0,0 +1,158 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControllerMode; +import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings; + +import static org.mockito.Mockito.mock; + +public class IOBasedAdmissionControllerTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + IOBasedAdmissionController ioBasedAdmissionController = null; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testCheckDefaultParameters() { + ioBasedAdmissionController = new IOBasedAdmissionController( + AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER, + Settings.EMPTY, + clusterService.getClusterSettings() + ); + assertEquals(ioBasedAdmissionController.getName(), AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + assertEquals( + ioBasedAdmissionController.ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), + AdmissionControllerMode.DISABLED + ); + assertFalse(ioBasedAdmissionController.isEnabledForTransportLayer()); + } + + public void testCheckUpdateSettings() { + ioBasedAdmissionController = new IOBasedAdmissionController( + AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER, + Settings.EMPTY, + clusterService.getClusterSettings() + ); + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControllerMode.ENFORCED.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + + assertEquals(ioBasedAdmissionController.getName(), AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + assertEquals( + ioBasedAdmissionController.ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), + AdmissionControllerMode.ENFORCED + ); + assertTrue(ioBasedAdmissionController.isEnabledForTransportLayer()); + } + + public void testApplyControllerWithDefaultSettings() { + ioBasedAdmissionController = new IOBasedAdmissionController( + AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER, + Settings.EMPTY, + clusterService.getClusterSettings() + ); + AdmissionControllerSettings admissionControllerSettings = new AdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(AdmissionControllerService.class) + ); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + assertEquals( + ioBasedAdmissionController.ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), + AdmissionControllerMode.DISABLED + ); + ioBasedAdmissionController.acquire(admissionControllerSettings); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + } + + public void testApplyControllerWhenSettingsEnabled() { + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControllerMode.ENFORCED.getMode() + ) + .build(); + ioBasedAdmissionController = new IOBasedAdmissionController( + AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER, + settings, + clusterService.getClusterSettings() + ); + assertTrue(ioBasedAdmissionController.isEnabledForTransportLayer()); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + AdmissionControllerSettings admissionControllerSettings = new AdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(AdmissionControllerService.class) + ); + ioBasedAdmissionController.acquire(admissionControllerSettings); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 1); + } + + public void testApplyControllerWhenForceDefaultEnabled() { + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControllerMode.ENFORCED.getMode() + ) + .build(); + ioBasedAdmissionController = new IOBasedAdmissionController( + AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER, + settings, + clusterService.getClusterSettings() + ); + assertTrue(ioBasedAdmissionController.isEnabledForTransportLayer()); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 0); + AdmissionControllerSettings admissionControllerSettings = new AdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(AdmissionControllerService.class) + ); + ioBasedAdmissionController.acquire(admissionControllerSettings); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 1); + Settings newSettings = Settings.builder() + .put(settings) + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.getKey(), true) + .put(AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControllerMode.DISABLED) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + assertTrue(ioBasedAdmissionController.isEnabledForTransportLayer()); + ioBasedAdmissionController.acquire(admissionControllerSettings); + assertEquals(ioBasedAdmissionController.getRejectionCount(), 1); + } +} diff --git a/server/src/test/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettingsTests.java b/server/src/test/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettingsTests.java new file mode 100644 index 0000000000000..e3ebb22281fc6 --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettingsTests.java @@ -0,0 +1,152 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.settings; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControllerMode; +import org.opensearch.throttling.admissioncontroller.enums.TransportRequestURIType; + +import java.util.Arrays; +import java.util.Set; + +import static org.mockito.Mockito.mock; + +public class IOBasedAdmissionControllerSettingsTests extends OpenSearchTestCase { + private ClusterService clusterService; + private ThreadPool threadPool; + + @Override + public void setUp() throws Exception { + super.setUp(); + threadPool = new TestThreadPool("admission_controller_settings_test"); + clusterService = new ClusterService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool + ); + } + + @Override + public void tearDown() throws Exception { + super.tearDown(); + threadPool.shutdownNow(); + } + + public void testSettingsExists() { + Set> settings = ClusterSettings.BUILT_IN_CLUSTER_SETTINGS; + assertTrue( + "All the io based admission controller settings should be supported built in settings", + settings.containsAll( + Arrays.asList( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST + ) + ) + ); + } + + public void testDefaultSettings() { + IOBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IOBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(IOBasedAdmissionController.class) + ); + assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControllerMode.DISABLED); + assertEquals( + ioBasedAdmissionControllerSettings.getTransportRequestURIList(), + Arrays.asList(TransportRequestURIType.INDEXING, TransportRequestURIType.SEARCH) + ); + } + + public void testGetConfiguredSettings() { + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControllerMode.ENFORCED.getMode() + ) + .putList( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST.getKey(), + Arrays.asList("indexing_primary", "search") + ) + .build(); + + IOBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IOBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + settings, + mock(IOBasedAdmissionController.class) + ); + assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControllerMode.ENFORCED); + assertEquals( + ioBasedAdmissionControllerSettings.getTransportRequestURIList(), + Arrays.asList(TransportRequestURIType.INDEXING_PRIMARY, TransportRequestURIType.SEARCH) + ); + } + + public void testUpdateAfterGetDefaultSettings() { + IOBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IOBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + Settings.EMPTY, + mock(IOBasedAdmissionController.class) + ); + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControllerMode.ENFORCED.getMode() + ) + .putList( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST.getKey(), + Arrays.asList("indexing_replica", "search") + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControllerMode.ENFORCED); + assertEquals( + ioBasedAdmissionControllerSettings.getTransportRequestURIList(), + Arrays.asList(TransportRequestURIType.INDEXING_REPLICA, TransportRequestURIType.SEARCH) + ); + } + + public void testUpdateAfterGetConfiguredSettings() { + Settings settings = Settings.builder() + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControllerMode.ENFORCED.getMode() + ) + .putList( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST.getKey(), + Arrays.asList("indexing_replica", "search") + ) + .build(); + + IOBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings = new IOBasedAdmissionControllerSettings( + clusterService.getClusterSettings(), + settings, + mock(IOBasedAdmissionController.class) + ); + assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControllerMode.ENFORCED); + Settings newSettings = Settings.builder() + .put(settings) + .put( + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControllerMode.MONITOR.getMode() + ) + .putList(IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST.getKey(), Arrays.asList("search")) + .build(); + clusterService.getClusterSettings().applySettings(newSettings); + assertEquals(ioBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControllerMode.MONITOR); + assertEquals(ioBasedAdmissionControllerSettings.getTransportRequestURIList(), Arrays.asList(TransportRequestURIType.SEARCH)); + } +} diff --git a/server/src/test/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportHandlerTests.java b/server/src/test/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportHandlerTests.java new file mode 100644 index 0000000000000..604a120f76d2b --- /dev/null +++ b/server/src/test/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportHandlerTests.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.transport; + +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.tasks.Task; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AdmissionControllerTransportHandlerTests extends OpenSearchTestCase { + AdmissionControllerTransportHandler admissionControllerTransportHandler; + private String testAction = ""; + + public void testHandlerInvoked() throws Exception { + String action = "TEST"; + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControllerTransportHandler = new AdmissionControllerTransportHandler( + action, + handler, + mock(AdmissionControllerService.class) + ); + admissionControllerTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + assertEquals(1, handler.count); + } + + public void testHandlerInvokedRejectedException() throws Exception { + String action = "TEST"; + AdmissionControllerService admissionControllerService = mock(AdmissionControllerService.class); + when(admissionControllerService.applyTransportLayerAdmissionController(testAction)).thenThrow( + new OpenSearchRejectedExecutionException() + ); + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControllerTransportHandler = new AdmissionControllerTransportHandler( + action, + handler, + admissionControllerService + ); + try { + admissionControllerTransportHandler.messageReceived( + mock(TransportRequest.class), + mock(TransportChannel.class), + mock(Task.class) + ); + } catch (OpenSearchRejectedExecutionException exception) { + assertEquals(0, handler.count); + handler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + } + assertEquals(1, handler.count); + } + + public void testHandlerInvokedRandomException() throws Exception { + String action = "TEST"; + AdmissionControllerService admissionControllerService = mock(AdmissionControllerService.class); + when(admissionControllerService.applyTransportLayerAdmissionController(testAction)).thenThrow(new NullPointerException()); + InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); + admissionControllerTransportHandler = new AdmissionControllerTransportHandler( + action, + handler, + admissionControllerService + ); + try { + admissionControllerTransportHandler.messageReceived( + mock(TransportRequest.class), + mock(TransportChannel.class), + mock(Task.class) + ); + } catch (Exception exception) { + assertEquals(0, handler.count); + handler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); + } + assertEquals(1, handler.count); + } + + private class InterceptingRequestHandler implements TransportRequestHandler { + private final String action; + public int count; + + public InterceptingRequestHandler(String action) { + this.action = action; + this.count = 0; + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + this.count = this.count + 1; + } + } +} diff --git a/settings.gradle b/settings.gradle index c04b5997d49b1..009c8b732fdd5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -153,3 +153,6 @@ if (extraProjects.exists()) { addSubProjects('', extraProjectDir) } } +include 'modules:throttling' +findProject(':modules:throttling')?.name = 'throttling' +