diff --git a/modules/throttling/build.gradle b/modules/throttling/build.gradle new file mode 100644 index 0000000000000..40fef23c137e4 --- /dev/null +++ b/modules/throttling/build.gradle @@ -0,0 +1,14 @@ +apply plugin: 'opensearch.java-rest-test' + +opensearchplugin { + description 'Plugin intercepting requests and throttle based on resource consumption' + classname 'org.opensearch.throttling.OpenSearchThrottlingModulePlugin' +} + +dependencies { + api project(path: ':modules:reindex') +} + +testClusters.all { + module ':modules:reindex' +} diff --git a/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java b/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java new file mode 100644 index 0000000000000..b53cf2829804b --- /dev/null +++ b/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java @@ -0,0 +1,46 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling; + +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; +import org.opensearch.throttling.admissioncontroller.transport.AdmissionControllerTransportInterceptor; +import org.opensearch.transport.TransportInterceptor; + +import java.util.ArrayList; +import java.util.List; + +/** + * This plugin is used to register handlers to intercept both rest and transport requests. + */ +public class OpenSearchThrottlingModulePlugin extends Plugin implements NetworkPlugin { + + AdmissionControllerService admissionControllerService; + + /** + * Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing + * transport (inter-node) requests. This must not return null + * + * @param namedWriteableRegistry registry of all named writeables registered + * @param threadContext a {@link ThreadContext} of the current nodes or clients that can be used to set additional + * headers in the interceptors + * @param admissionControllerService Services that allowed during the transport interception of requests. + * @return + */ + @Override + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext, AdmissionControllerService admissionControllerService) { + this.admissionControllerService = admissionControllerService; + List interceptors = new ArrayList<>(0); + interceptors.add(new AdmissionControllerTransportInterceptor(this.admissionControllerService)); + return interceptors; + } +} diff --git a/modules/throttling/src/main/plugin-metadata/plugin-security.policy b/modules/throttling/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 0000000000000..ccfd6ba70dd16 --- /dev/null +++ b/modules/throttling/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +grant { + // needed to generate runtime classes + permission java.lang.RuntimePermission "createClassLoader"; + + // needed to find the classloader to load allowlisted classes from + permission java.lang.RuntimePermission "getClassLoader"; +}; diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 3539ea7f3f526..cd75df0fb6416 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -58,10 +58,8 @@ import org.opensearch.tasks.RawTaskStatus; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; -import org.opensearch.transport.Transport; -import org.opensearch.transport.TransportInterceptor; -import org.opensearch.transport.TransportRequest; -import org.opensearch.transport.TransportRequestHandler; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; +import org.opensearch.transport.*; import java.io.IOException; import java.util.ArrayList; @@ -143,6 +141,7 @@ public NetworkModule( BigArrays bigArrays, PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService, + AdmissionControllerService admissionControllerService, NamedWriteableRegistry namedWriteableRegistry, NamedXContentRegistry xContentRegistry, NetworkService networkService, @@ -178,8 +177,13 @@ public NetworkModule( } List transportInterceptors = plugin.getTransportInterceptors( namedWriteableRegistry, - threadPool.getThreadContext() + threadPool.getThreadContext(), + admissionControllerService ); + transportInterceptors.addAll(plugin.getTransportInterceptors( + namedWriteableRegistry, + threadPool.getThreadContext() + )); for (TransportInterceptor interceptor : transportInterceptors) { registerTransportInterceptor(interceptor); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 32d14a3519659..d4ca22cdef44d 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -148,6 +148,9 @@ import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings; +import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController; +import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings; import org.opensearch.transport.ProxyConnectionStrategy; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.RemoteConnectionStrategy; @@ -233,6 +236,10 @@ public void apply(Settings value, Settings current, Settings previous) { public static Set> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet( new HashSet<>( Arrays.asList( + AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, + AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_URI_LIST_SETTING, + AdmissionControllerSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING, + IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_MODE, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/Settings.java b/server/src/main/java/org/opensearch/common/settings/Settings.java index 8f3bf1fd66b81..d992bc33589e8 100644 --- a/server/src/main/java/org/opensearch/common/settings/Settings.java +++ b/server/src/main/java/org/opensearch/common/settings/Settings.java @@ -902,7 +902,7 @@ public Builder copy(String key, String sourceKey, Settings source) { /** * Sets a null value for the given setting key */ - public Builder putNull(String key) { + public Builder putNull(String key) { return put(key, (String) null); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 51cc7c9007159..9e516a12ecc4f 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -222,6 +222,7 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportInterceptor; @@ -749,6 +750,12 @@ protected Node( fileCacheCleaner ); + final AdmissionControllerService admissionControllerService = new AdmissionControllerService( + settings, + clusterService.getClusterSettings(), + threadPool + ); + final AliasValidator aliasValidator = new AliasValidator(); final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices); @@ -827,6 +834,7 @@ protected Node( bigArrays, pageCacheRecycler, circuitBreakerService, + admissionControllerService, namedWriteableRegistry, xContentRegistry, networkService, @@ -1094,6 +1102,7 @@ protected Node( b.bind(IndexingPressureService.class).toInstance(indexingPressureService); b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService); b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService); + b.bind(AdmissionControllerService.class).toInstance(admissionControllerService); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); @@ -1263,6 +1272,7 @@ public Node start() throws NodeValidationException { injector.getInstance(RepositoriesService.class).start(); injector.getInstance(SearchService.class).start(); injector.getInstance(FsHealthService.class).start(); + injector.getInstance(AdmissionControllerService.class).start(); nodeService.getMonitorService().start(); nodeService.getSearchBackpressureService().start(); nodeService.getTaskCancellationMonitoringService().start(); @@ -1418,6 +1428,7 @@ private Node stop() { injector.getInstance(ClusterService.class).stop(); injector.getInstance(NodeConnectionsService.class).stop(); injector.getInstance(FsHealthService.class).stop(); + injector.getInstance(AdmissionControllerService.class).stop(); nodeService.getMonitorService().stop(); nodeService.getSearchBackpressureService().stop(); injector.getInstance(GatewayService.class).stop(); @@ -1481,6 +1492,8 @@ public synchronized void close() throws IOException { toClose.add(nodeService.getSearchBackpressureService()); toClose.add(() -> stopWatch.stop().start("fsHealth")); toClose.add(injector.getInstance(FsHealthService.class)); + toClose.add(() -> stopWatch.stop().start("admission_controller")); + toClose.add(injector.getInstance(AdmissionControllerService.class)); toClose.add(() -> stopWatch.stop().start("gateway")); toClose.add(injector.getInstance(GatewayService.class)); toClose.add(() -> stopWatch.stop().start("search")); diff --git a/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java b/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java index de0aecc7833c9..207bc5c6f062c 100644 --- a/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/NetworkPlugin.java @@ -42,6 +42,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.http.HttpServerTransport; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportInterceptor; @@ -72,6 +73,14 @@ default List getTransportInterceptors( return Collections.emptyList(); } + default List getTransportInterceptors( + NamedWriteableRegistry namedWriteableRegistry, + ThreadContext threadContext, + AdmissionControllerService admissionControllerService + ) { + return Collections.emptyList(); + } + /** * Returns a map of {@link Transport} suppliers. * See {@link org.opensearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation. diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerService.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerService.java new file mode 100644 index 0000000000000..13e7615348213 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerService.java @@ -0,0 +1,162 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.controllers.AdmissionController; +import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController; +import static org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch. + */ +public class AdmissionControllerService extends AbstractLifecycleComponent { + private final ThreadPool threadPool; + public final AdmissionControllerSettings admissionControllerSettings; + private final ConcurrentMap ADMISSION_CONTROLLERS; + private static AdmissionControllerService admissionControllerService = null; + private static final Logger logger = LogManager.getLogger(AdmissionControllerService.class); + private final ClusterSettings clusterSettings; + private final Settings settings; + + /** + * + * @param settings Immutable settings instance + * @param clusterSettings ClusterSettings Instance + * @param threadPool ThreadPool Instance + */ + public AdmissionControllerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + this.threadPool = threadPool; + this.admissionControllerSettings = new AdmissionControllerSettings(clusterSettings, settings); + this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>(); + this.clusterSettings = clusterSettings; + this.settings = settings; + } + + /** + * Initialise and Register all the admissionControllers + */ + private void initialise() { + // Initialise different type of admission controllers + registerAdmissionController(this.getDefaultAdmissionControllerState(IO_BASED_ADMISSION_CONTROLLER)); + } + + private AdmissionControllerState getDefaultAdmissionControllerState(String controllerName){ + return new AdmissionControllerState( + controllerName, + this.admissionControllerSettings.getAdmissionControllerTransportLayerMode(), + this.admissionControllerSettings.getDefaultTransportLayerUriList() + ); + } + + /** + * Handler to trigger registered admissionController + */ + public void applyTransportLayerAdmissionController() { + if (this.isTransportLayerAdmissionControllerEnabled()) { + this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { + if(admissionController.getAdmissionControllerState().isAdmissionControllerTransportLayerEnabled()) { + admissionController.acquire(admissionControllerSettings); + } + }); + } + } + + /** + * + */ + @Override + protected void doStart() { + this.initialise(); + } + + /** + * + * @return singleton admissionControllerService Instance + */ + public static AdmissionControllerService getInstance() { + return admissionControllerService; + } + + /** + * + * @param settings Immutable settings instance + * @param clusterSettings ClusterSettings Instance + * @param threadPool ThreadPool Instance + * @return singleton admissionControllerService Instance + */ + public static synchronized AdmissionControllerService newAdmissionControllerService( + Settings settings, + ClusterSettings clusterSettings, + ThreadPool threadPool + ) { + if(admissionControllerService == null) { + admissionControllerService = new AdmissionControllerService(settings, clusterSettings, threadPool); + } + return admissionControllerService; + } + + @Override + protected void doStop() {} + + /** + * @throws IOException + */ + @Override + protected void doClose() throws IOException {} + + /** + * + * @return true if the admissionController Feature is enabled + */ + public Boolean isTransportLayerAdmissionControllerEnabled() { + return this.admissionControllerSettings.isTransportLayerAdmissionControllerEnforced(); + } + + /** + * + * @param admissionControllerState admissionControllerState to register into the service. + */ + public void registerAdmissionController(AdmissionControllerState admissionControllerState) { + AdmissionController admissionController = this.controllerFactory(admissionControllerState); + if (admissionController != null) { + this.ADMISSION_CONTROLLERS.put(admissionControllerState.getControllerName(), admissionController); + } + } + + /** + * + * @param admissionControllerState admissionControllerState to create instance + * @return AdmissionController Instance + */ + private AdmissionController controllerFactory(AdmissionControllerState admissionControllerState) { + switch (admissionControllerState.getControllerName()) { + case IO_BASED_ADMISSION_CONTROLLER: + return new IOBasedAdmissionController(admissionControllerState, this.settings, this.clusterSettings); + default: + return null; + } + } + + public List getListAdmissionControllers() { + return new ArrayList<>(this.ADMISSION_CONTROLLERS.values()); + } + +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettings.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettings.java new file mode 100644 index 0000000000000..14172f8f9da23 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettings.java @@ -0,0 +1,135 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.opensearch.action.bulk.BulkAction; +import org.opensearch.action.search.SearchAction; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControllerMode; +import org.opensearch.throttling.admissioncontroller.enums.TransportRequestURIType; + +import java.util.Arrays; +import java.util.List; + +/** + * Settings related to admission controller. + * @opensearch.internal + */ +public final class AdmissionControllerSettings { + + public static class Defaults { + public static final String MODE = "disabled"; + public static final List DEFAULT_TRANSPORT_URI_LIST_SETTING = Arrays.asList(BulkAction.NAME + "[s][p]", SearchAction.NAME); + } + + public static final String IO_BASED_ADMISSION_CONTROLLER = "global_io_usage"; + + /** + * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set + * rejection will be performed, otherwise only rejection metrics will be populated. + */ + + public static final Setting ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE = new Setting<>( + "admission_controller.transport.mode", + Defaults.MODE, + AdmissionControllerMode::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * This setting provides the selectness of URI patterns which consumers can Match and + * filter the application of Admission controller + */ + public static final Setting> ADMISSION_CONTROLLER_TRANSPORT_URI_LIST_SETTING = Setting.listSetting( + "admission_controller.transport.uri_list", + Defaults.DEFAULT_TRANSPORT_URI_LIST_SETTING, + TransportRequestURIType::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + public static final Setting ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING = Setting.boolSetting( + "admission_controller.force_default_settings", + false, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private volatile Boolean forceEnableDefault; + + private List transportLayerAdmissionControllerURIList; + + private volatile AdmissionControllerMode transportLayeradmissionControllerMode; + + /** + * + * @param clusterSettings clusterSettings Instance + * @param settings + */ + public AdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) { + this.transportLayeradmissionControllerMode = ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings); + this.transportLayerAdmissionControllerURIList = ADMISSION_CONTROLLER_TRANSPORT_URI_LIST_SETTING.get(settings); + this.forceEnableDefault = ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(ADMISSION_CONTROLLER_TRANSPORT_URI_LIST_SETTING, this::setDefaultTransportLayerURIList); + clusterSettings.addSettingsUpdateConsumer(ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING,this::setForceEnableDefaultSettings); + clusterSettings.addSettingsUpdateConsumer(ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,this::setAdmissionControllerTransportLayerMode); + } + + /** + * + * @param admissionControllerMode update the mode of admission controller feature + */ + private void setAdmissionControllerTransportLayerMode(AdmissionControllerMode admissionControllerMode) { + this.transportLayeradmissionControllerMode = admissionControllerMode; + } + + public AdmissionControllerMode getAdmissionControllerTransportLayerMode() { + return this.transportLayeradmissionControllerMode; + } + + public void setForceEnableDefaultSettings(Boolean forceEnableDefault) { + this.forceEnableDefault = forceEnableDefault; + } + + public Boolean isForceDefaultSettingsEnabled() { + return this.forceEnableDefault; + } + + /** + * + * @return boolean based on the admission controller feature is enforced or not + */ + public Boolean isTransportLayerAdmissionControllerEnforced() { + return this.transportLayeradmissionControllerMode == AdmissionControllerMode.ENFORCED; + } + + + /** + * + * @return boolean based on the admission controller feature is enabled or not + */ + public Boolean isTransportLayerAdmissionControllerEnabled() { + return this.transportLayeradmissionControllerMode != AdmissionControllerMode.DISABLED; + } + + /** + * + * @return list of the default URI admission controller feature is applied + */ + public List getDefaultTransportLayerUriList() { + return this.transportLayerAdmissionControllerURIList; + } + + public void setDefaultTransportLayerURIList(List defaultAdmissionControllerURIList) { + this.transportLayerAdmissionControllerURIList = defaultAdmissionControllerURIList; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerState.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerState.java new file mode 100644 index 0000000000000..dac83ce187a4a --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerState.java @@ -0,0 +1,85 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControllerMode; +import org.opensearch.throttling.admissioncontroller.enums.TransportRequestURIType; + +import java.util.List; + +/** + * Defines the current state of the admission controller + */ +public class AdmissionControllerState { + private AdmissionControllerMode transportLayerMode; + private List transportRequestURITypeList; + private String controllerName; + + /** + * + * @param controllerName Name of the admission controller + * @param transportLayerMode Mode of the transportLayer admission controller + * @param applyURIList URI list for which the admission controller need to be applied. + */ + public AdmissionControllerState(String controllerName, AdmissionControllerMode transportLayerMode, List applyURIList) { + this.transportRequestURITypeList = applyURIList; + this.transportLayerMode = transportLayerMode; + this.controllerName = controllerName; + } + + /** + * + * @return mode of the admission-controller + */ + public AdmissionControllerMode getTransportLayerMode() { + return this.transportLayerMode; + } + + public Boolean isAdmissionControllerTransportLayerEnabled() { + return this.transportLayerMode != AdmissionControllerMode.DISABLED; + } + + /** + * @param mode Update the mode for admission controller + */ + public void setMode(AdmissionControllerMode mode) { + this.transportLayerMode = mode; + } + + /** + * + * @return list of the URI where the admission controller needs to applied + */ + public List getApplyURIList() { + return transportRequestURITypeList; + } + + /** + * + * @param applyURIList update the URI list where the admission controller needs to applied + */ + public void setApplyURIList(List applyURIList) { + this.transportRequestURITypeList = applyURIList; + } + + /** + * + * @return name of the admission controller + */ + public String getControllerName() { + return controllerName; + } + + /** + * @param controllerName update the name of the admission controller + */ + private void setControllerName(String controllerName) { + this.controllerName = controllerName; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java new file mode 100644 index 0000000000000..7337a9f90550c --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; + +import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerState; + +/** + * Interface for Admission Controller in OpenSearch, which aims to provide resource based request admission control. + * It provides methods for any tracking-object that can be incremented (such as memory size), + * and admission control can be applied if configured limit has been reached + */ +public interface AdmissionController { + + /** + * Return the current state of the admission controller + * @return + */ + AdmissionControllerState getAdmissionControllerState(); + + /** + * Increment the tracking-objects and apply the admission control if threshold is breached. + * Mostly applicable while acquiring the quota. + * @return boolean if admission controller able to acquire the requested quota. + */ + boolean acquire(AdmissionControllerSettings admissionControllerSettings); + + /** + * Decrement the tracking-objects and do not apply the admission control. + * Mostly applicable while remitting the quota. + * @return boolean if admission controller is released the acquired quota. + */ + boolean release(); + + /** + * @return name of the admission-controller + */ + String getName(); +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java new file mode 100644 index 0000000000000..8ace72f1ee1ce --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControllerMode; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerState; +import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings; + +/** + * Class for IO Based Admission Controller in OpenSearch, which aims to provide IO utilisation admission control. + * It provides methods to apply admission control if configured limit has been reached + */ +public class IOBasedAdmissionController implements AdmissionController { + private static final Logger LOGGER = LogManager.getLogger(IOBasedAdmissionController.class); + private final AdmissionControllerState admissionControllerState; + + public IOBasedAdmissionControllerSettings ioBasedAdmissionControllerSettings; + private long rejectionCount; + + /** + * + * @param admissionControllerState State of the admission controller + */ + public IOBasedAdmissionController(AdmissionControllerState admissionControllerState, Settings settings, ClusterSettings clusterSettings) { + this.admissionControllerState = admissionControllerState; + this.ioBasedAdmissionControllerSettings = new IOBasedAdmissionControllerSettings(clusterSettings, settings, this.admissionControllerState); + this.rejectionCount = 0; + } + + private void setIoBasedAdmissionControllerMode(AdmissionControllerMode admissionControllerMode) { + this.admissionControllerState.setMode(admissionControllerMode); + } + + /** + * @return admissionControllerState + */ + @Override + public AdmissionControllerState getAdmissionControllerState() { + return this.admissionControllerState; + } + + /** + * This function will take of applying admission controller based on IO usage + * @return Boolean if the admission controller is applied successfully + */ + @Override + public boolean acquire(AdmissionControllerSettings admissionControllerSettings) { + // TODO Will extend this logic further currently just incrementing rejectionCount + Boolean isEnabled = this.admissionControllerState.isAdmissionControllerTransportLayerEnabled(); + if(admissionControllerSettings.isForceDefaultSettingsEnabled()){ + isEnabled = admissionControllerSettings.isTransportLayerAdmissionControllerEnabled(); + } + if(isEnabled){ + this.incrementRejectionCount(1); + } + return true; + } + + /** + * @return Boolean if the admission controller cleared the objects that acquired + */ + @Override + public boolean release() { + return false; + } + + /** + * @return name of the admission Controller + */ + @Override + public String getName() { + return this.admissionControllerState.getControllerName(); + } + + /** + * @return rejection count of the admission controller + */ + public long getRejectionCount() { + return rejectionCount; + } + + /** + * + * @param count update the count of the rejectionCount + */ + public void incrementRejectionCount(long count) { + this.rejectionCount = this.rejectionCount + count; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java new file mode 100644 index 0000000000000..4b9c1b6f3864a --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/AdmissionControllerMode.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/AdmissionControllerMode.java new file mode 100644 index 0000000000000..1aade9f21be46 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/AdmissionControllerMode.java @@ -0,0 +1,66 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.enums; + +import java.util.Locale; + +/** + * Defines the AdmissionControllerMode + */ +public enum AdmissionControllerMode { + /** + * AdmissionController is completely disabled. + */ + DISABLED("disabled"), + + /** + * AdmissionController only monitors the rejection criteria for the requests. + */ + MONITOR("monitor_only"), + + /** + * AdmissionController monitors and rejects tasks that exceed resource usage thresholds. + */ + ENFORCED("enforced"); + + private final String mode; + + /** + * @param mode update mode of the admission controller + */ + AdmissionControllerMode(String mode) { + this.mode = mode; + } + + /** + * + * @return mode of the admission controller + */ + private String getMode() { + return this.mode; + } + + /** + * + * @param name is the mode of the current + * @return Enum of AdmissionControllerMode based on the mode + */ + public static AdmissionControllerMode fromName(String name) { + switch (name.toLowerCase(Locale.ROOT)) { + case "disabled": + return DISABLED; + case "monitor_only": + return MONITOR; + case "enforced": + return ENFORCED; + } + + throw new IllegalArgumentException("Invalid AdmissionControllerMode: " + name); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/TransportRequestURIType.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/TransportRequestURIType.java new file mode 100644 index 0000000000000..e897aca8b3af3 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/enums/TransportRequestURIType.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.enums; + +import org.opensearch.action.bulk.BulkAction; +import org.opensearch.action.search.SearchAction; + +import java.util.Locale; + +public enum TransportRequestURIType { + INDEXING_PRIMARY(BulkAction.NAME.concat("[s][p]")), + + INDEXING_REPLICA(BulkAction.NAME.concat("[s][r]")), + + SEARCH(SearchAction.NAME); + private final String type; + + TransportRequestURIType(String uriType) { + this.type = uriType; + } + + public String getType() { + return type; + } + + public static TransportRequestURIType fromName(String name) { + name = name.toLowerCase(Locale.ROOT); + switch (name) { + case BulkAction.NAME + "[s][p]": + return INDEXING_PRIMARY; + case BulkAction.NAME + "[s][r]": + return INDEXING_REPLICA; + case SearchAction.NAME: + return SEARCH; + } + throw new IllegalArgumentException("Invalid TransportRequestURIType: " + name); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/AdmissionControllerTransportHandler.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/AdmissionControllerTransportHandler.java new file mode 100644 index 0000000000000..74da50ef746ab --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/AdmissionControllerTransportHandler.java @@ -0,0 +1,75 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.handler; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.tasks.Task; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * AdmissionController Handler to intercept Transport Requests. + * @param Transport Request + */ +public class AdmissionControllerTransportHandler implements TransportRequestHandler { + + private final String action; + private final TransportRequestHandler actualHandler; + protected final Logger log = LogManager.getLogger(this.getClass()); + AdmissionControllerService admissionControllerService; + + public AdmissionControllerTransportHandler(String action, TransportRequestHandler actualHandler, AdmissionControllerService admissionControllerService) { + super(); + this.action = action; + this.actualHandler = actualHandler; + this.admissionControllerService = admissionControllerService; + } + + /** + * @param request Transport Request that landed on the node + * @param channel Transport channel allows to send a response to a request + * @param task Current task that is executing + * @throws Exception + */ + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + // intercept all the transport requests here and apply admission control + try { + this.admissionControllerService.applyTransportLayerAdmissionController(); + } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { + channel.sendResponse(openSearchRejectedExecutionException); + throw openSearchRejectedExecutionException; + } catch (final Exception e) { + throw e; + } + this.messageReceivedDecorate(request, actualHandler, channel, task); + } + + /** + * + * @param request Transport Request that landed on the node + * @param actualHandler is the next handler to intercept the request + * @param transportChannel Transport channel allows to send a response to a request + * @param task Current task that is executing + * @throws Exception + */ + protected void messageReceivedDecorate( + final T request, + final TransportRequestHandler actualHandler, + final TransportChannel transportChannel, + Task task + ) throws Exception { + actualHandler.messageReceived(request, transportChannel, task); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/package-info.java new file mode 100644 index 0000000000000..3ce13eef5f9d9 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.handler; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java new file mode 100644 index 0000000000000..a6abeb832a38a --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettings.java new file mode 100644 index 0000000000000..200848f432b0b --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/settings/IOBasedAdmissionControllerSettings.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.settings; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerState; +import org.opensearch.throttling.admissioncontroller.enums.AdmissionControllerMode; + +public class IOBasedAdmissionControllerSettings { + private final AdmissionControllerState admissionControllerState; + public static final Setting IO_BASED_ADMISSION_CONTROLLER_MODE = new Setting<>( + "admission_controller.global_io_usage.mode", + AdmissionControllerSettings.Defaults.MODE, + AdmissionControllerMode::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + // currently limited to one setting will add further more settings in follow-up PR's + public IOBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings, AdmissionControllerState admissionControllerState){ + this.admissionControllerState = admissionControllerState; + this.admissionControllerState.setMode(IO_BASED_ADMISSION_CONTROLLER_MODE.get(settings)); + clusterSettings.addSettingsUpdateConsumer(IO_BASED_ADMISSION_CONTROLLER_MODE, this::setIoBasedAdmissionControllerMode); + } + + private void setIoBasedAdmissionControllerMode(AdmissionControllerMode admissionControllerMode) { + this.admissionControllerState.setMode(admissionControllerMode); + } + + public AdmissionControllerMode getIoBasedAdmissionControllerMode(){ + return this.admissionControllerState.getTransportLayerMode(); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportInterceptor.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportInterceptor.java new file mode 100644 index 0000000000000..f11d755e67cf7 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportInterceptor.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.transport; + +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; +import org.opensearch.throttling.admissioncontroller.handler.AdmissionControllerTransportHandler; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * This class allows throttling to intercept requests on both the sender and the receiver side. + */ +public class AdmissionControllerTransportInterceptor implements TransportInterceptor { + + AdmissionControllerService admissionControllerService; + public AdmissionControllerTransportInterceptor(AdmissionControllerService admissionControllerService){ + this.admissionControllerService = admissionControllerService; + } + + /** + * + * @return admissionController handler to intercept transport requests + */ + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + return new AdmissionControllerTransportHandler<>(action, actualHandler, this.admissionControllerService); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java new file mode 100644 index 0000000000000..733e9647b71e7 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.transport; diff --git a/server/src/main/java/org/opensearch/throttling/package-info.java b/server/src/main/java/org/opensearch/throttling/package-info.java new file mode 100644 index 0000000000000..85a43dc86789d --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling; diff --git a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java index 2e3c98db6fa81..a756c3183e4e9 100644 --- a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java @@ -310,6 +310,7 @@ private NetworkModule newNetworkModule(Settings settings, NetworkPlugin... plugi null, null, null, + null, xContentRegistry(), null, new NullDispatcher(), diff --git a/settings.gradle b/settings.gradle index c04b5997d49b1..009c8b732fdd5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -153,3 +153,6 @@ if (extraProjects.exists()) { addSubProjects('', extraProjectDir) } } +include 'modules:throttling' +findProject(':modules:throttling')?.name = 'throttling' +