From 06a4ff45c9f23bfc3c09c995663bbfc087b68c0c Mon Sep 17 00:00:00 2001 From: Ajay Kumar Movva Date: Wed, 23 Aug 2023 01:55:40 +0530 Subject: [PATCH] Admission Controller Module Transport Interceptor Initial Commit Signed-off-by: Ajay Kumar Movva --- modules/throttling/build.gradle | 14 ++ .../OpenSearchThrottlingModulePlugin.java | 41 +++++ .../plugin-metadata/plugin-security.policy | 39 +++++ .../common/settings/ClusterSettings.java | 3 + .../main/java/org/opensearch/node/Node.java | 12 ++ .../AdmissionControllerMode.java | 73 +++++++++ .../AdmissionControllerService.java | 145 ++++++++++++++++++ .../AdmissionControllerSettings.java | 102 ++++++++++++ .../AdmissionControllerState.java | 96 ++++++++++++ .../controllers/AdmissionController.java | 44 ++++++ .../IOBasedAdmissionController.java | 64 ++++++++ .../controllers/package-info.java | 9 ++ .../AdmissionControllerTransportHandler.java | 74 +++++++++ .../handler/package-info.java | 9 ++ .../admissioncontroller/package-info.java | 9 ++ ...missionControllerTransportInterceptor.java | 34 ++++ .../transport/package-info.java | 9 ++ .../opensearch/throttling/package-info.java | 9 ++ settings.gradle | 3 + 19 files changed, 789 insertions(+) create mode 100644 modules/throttling/build.gradle create mode 100644 modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java create mode 100644 modules/throttling/src/main/plugin-metadata/plugin-security.policy create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerMode.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerService.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettings.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerState.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/AdmissionControllerTransportHandler.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/package-info.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportInterceptor.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java create mode 100644 server/src/main/java/org/opensearch/throttling/package-info.java diff --git a/modules/throttling/build.gradle b/modules/throttling/build.gradle new file mode 100644 index 0000000000000..40fef23c137e4 --- /dev/null +++ b/modules/throttling/build.gradle @@ -0,0 +1,14 @@ +apply plugin: 'opensearch.java-rest-test' + +opensearchplugin { + description 'Plugin intercepting requests and throttle based on resource consumption' + classname 'org.opensearch.throttling.OpenSearchThrottlingModulePlugin' +} + +dependencies { + api project(path: ':modules:reindex') +} + +testClusters.all { + module ':modules:reindex' +} diff --git a/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java b/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java new file mode 100644 index 0000000000000..c67198dafab53 --- /dev/null +++ b/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling; + +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.throttling.admissioncontroller.transport.AdmissionControllerTransportInterceptor; +import org.opensearch.transport.TransportInterceptor; + +import java.util.ArrayList; +import java.util.List; + +/** + * This plugin is used to register handlers to intercept both rest and transport requests. + */ +public class OpenSearchThrottlingModulePlugin extends Plugin implements NetworkPlugin { + + /** + * Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing + * transport (inter-node) requests. This must not return null + * + * @param namedWriteableRegistry registry of all named writeables registered + * @param threadContext a {@link ThreadContext} of the current nodes or clients that can be used to set additional + * headers in the interceptors + * @return + */ + @Override + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { + List interceptors = new ArrayList<>(0); + interceptors.add(new AdmissionControllerTransportInterceptor()); + return interceptors; + } +} diff --git a/modules/throttling/src/main/plugin-metadata/plugin-security.policy b/modules/throttling/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 0000000000000..ccfd6ba70dd16 --- /dev/null +++ b/modules/throttling/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +grant { + // needed to generate runtime classes + permission java.lang.RuntimePermission "createClassLoader"; + + // needed to find the classloader to load allowlisted classes from + permission java.lang.RuntimePermission "getClassLoader"; +}; diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 32d14a3519659..f01ee2755948b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -148,6 +148,7 @@ import org.opensearch.tasks.consumer.TopNSearchTasksLogger; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings; import org.opensearch.transport.ProxyConnectionStrategy; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.RemoteConnectionStrategy; @@ -233,6 +234,8 @@ public void apply(Settings value, Settings current, Settings previous) { public static Set> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet( new HashSet<>( Arrays.asList( + AdmissionControllerSettings.ADMISSION_CONTROLLER_URI_LIST_SETTING, + AdmissionControllerSettings.ADMISSION_CONTROLLER_MODE, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING, AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING, AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING, diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 51cc7c9007159..942c1b38310f8 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -222,6 +222,7 @@ import org.opensearch.threadpool.ExecutorBuilder; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; import org.opensearch.transport.RemoteClusterService; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportInterceptor; @@ -904,6 +905,12 @@ protected Node( transportService.getTaskManager() ); + final AdmissionControllerService admissionControllerService = AdmissionControllerService.newAdmissionControllerService( + settings, + clusterService, + threadPool + ); + final RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings()); RepositoriesModule repositoriesModule = new RepositoriesModule( this.environment, @@ -1094,6 +1101,7 @@ protected Node( b.bind(IndexingPressureService.class).toInstance(indexingPressureService); b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService); b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService); + b.bind(AdmissionControllerService.class).toInstance(admissionControllerService); b.bind(UsageService.class).toInstance(usageService); b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService()); b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); @@ -1263,6 +1271,7 @@ public Node start() throws NodeValidationException { injector.getInstance(RepositoriesService.class).start(); injector.getInstance(SearchService.class).start(); injector.getInstance(FsHealthService.class).start(); + injector.getInstance(AdmissionControllerService.class).start(); nodeService.getMonitorService().start(); nodeService.getSearchBackpressureService().start(); nodeService.getTaskCancellationMonitoringService().start(); @@ -1418,6 +1427,7 @@ private Node stop() { injector.getInstance(ClusterService.class).stop(); injector.getInstance(NodeConnectionsService.class).stop(); injector.getInstance(FsHealthService.class).stop(); + injector.getInstance(AdmissionControllerService.class).stop(); nodeService.getMonitorService().stop(); nodeService.getSearchBackpressureService().stop(); injector.getInstance(GatewayService.class).stop(); @@ -1481,6 +1491,8 @@ public synchronized void close() throws IOException { toClose.add(nodeService.getSearchBackpressureService()); toClose.add(() -> stopWatch.stop().start("fsHealth")); toClose.add(injector.getInstance(FsHealthService.class)); + toClose.add(() -> stopWatch.stop().start("admission_controller")); + toClose.add(injector.getInstance(AdmissionControllerService.class)); toClose.add(() -> stopWatch.stop().start("gateway")); toClose.add(injector.getInstance(GatewayService.class)); toClose.add(() -> stopWatch.stop().start("search")); diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerMode.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerMode.java new file mode 100644 index 0000000000000..80f0cb1e694e8 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerMode.java @@ -0,0 +1,73 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import java.util.Locale; + +/** + * Defines the AdmissionControllerMode + */ +public enum AdmissionControllerMode { + /** + * AdmissionController is completely disabled. + */ + DISABLED("disabled"), + + /** + * AdmissionController only monitors the rejection criteria for the requests. + */ + MONITOR("monitor_only"), + + /** + * AdmissionController monitors and rejects tasks that exceed resource usage thresholds. + */ + ENFORCED("enforced"), + + /** + * AdmissionController monitors and rejects only X percent of the requests that exceed resource usage thresholds. + */ + HALF_OPEN("half_open"); + + private final String mode; + + /** + * @param mode update mode of the admission controller + */ + AdmissionControllerMode(String mode) { + this.mode = mode; + } + + /** + * + * @return mode of the admission controller + */ + private String getMode() { + return this.mode; + } + + /** + * + * @param name is the mode of the current + * @return Enum of AdmissionControllerMode based on the mode + */ + public static AdmissionControllerMode fromName(String name) { + switch (name.toLowerCase(Locale.ROOT)) { + case "disabled": + return DISABLED; + case "monitor_only": + return MONITOR; + case "enforced": + return ENFORCED; + case "half_open": + return HALF_OPEN; + } + + throw new IllegalArgumentException("Invalid AdmissionControllerMode: " + name); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerService.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerService.java new file mode 100644 index 0000000000000..96e265f612393 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerService.java @@ -0,0 +1,145 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.Settings; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.throttling.admissioncontroller.controllers.AdmissionController; +import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController; + +import java.io.IOException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch. + */ +public class AdmissionControllerService extends AbstractLifecycleComponent { + private final ThreadPool threadPool; + private final AdmissionControllerSettings admissionControllerSettings; + private final ConcurrentMap ADMISSION_CONTROLLERS; + + private static AdmissionControllerService admissionControllerService = null; + + public static final String IO_BASED_ADMISSION_CONTROLLER = "global_io_usage"; + + /** + * + * @param settings Immutable settings instance + * @param clusterService ClusterService Instance + * @param threadPool ThreadPool Instance + */ + private AdmissionControllerService(Settings settings, ClusterService clusterService, ThreadPool threadPool) { + this.threadPool = threadPool; + this.admissionControllerSettings = new AdmissionControllerSettings(clusterService, settings); + this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>(); + } + + /** + * Initialise and Register all the admissionControllers + */ + private void initialise() { + // Initialise different type of admission controllers + AdmissionControllerState ioBasedAdmissionControllerState = new AdmissionControllerState( + IO_BASED_ADMISSION_CONTROLLER, + AdmissionControllerMode.MONITOR, + this.admissionControllerSettings.getDefaultUriList() + ); + registerAdmissionController(ioBasedAdmissionControllerState); + } + + /** + * Handler to trigger registered admissionController + */ + public void applyAdmissionController() { + if (this.isAdmissionControllerEnabled()) { + this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.acquire(); }); + } + } + + /** + * + */ + @Override + protected void doStart() { + this.initialise(); + } + + /** + * + * @return singleton admissionControllerService Instance + */ + public static AdmissionControllerService getInstance() { + return admissionControllerService; + } + + /** + * + * @param settings Immutable settings instance + * @param clusterService ClusterService Instance + * @param threadPool ThreadPool Instance + * @return singleton admissionControllerService Instance + */ + public static synchronized AdmissionControllerService newAdmissionControllerService( + Settings settings, + ClusterService clusterService, + ThreadPool threadPool + ) { + if (admissionControllerService == null) { + admissionControllerService = new AdmissionControllerService(settings, clusterService, threadPool); + } + return admissionControllerService; + } + + @Override + protected void doStop() {} + + /** + * @throws IOException + */ + @Override + protected void doClose() throws IOException {} + + /** + * + * @return true if the admissionController Feature is enabled + */ + public Boolean isAdmissionControllerEnabled() { + return this.admissionControllerSettings.isAdmissionControllerEnabled(); + } + + /** + * + * @param admissionControllerState admissionControllerState to register into the service. + */ + private void registerAdmissionController(AdmissionControllerState admissionControllerState) { + if (!this.ADMISSION_CONTROLLERS.containsKey(admissionControllerState.getControllerName())) { + AdmissionController admissionController = controllerFactory(admissionControllerState); + if (admissionController != null) { + this.ADMISSION_CONTROLLERS.put(admissionControllerState.getControllerName(), admissionController); + } + } + } + + /** + * + * @param admissionControllerState admissionControllerState to create instance + * @return AdmissionController Instance + */ + private static AdmissionController controllerFactory(AdmissionControllerState admissionControllerState) { + switch (admissionControllerState.getControllerName()) { + case IO_BASED_ADMISSION_CONTROLLER: + return new IOBasedAdmissionController(admissionControllerState); + default: + return null; + } + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettings.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettings.java new file mode 100644 index 0000000000000..e75c6e5c48453 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerSettings.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; + +/** + * Settings related to admission controller. + * @opensearch.internal + */ +public final class AdmissionControllerSettings { + + private static class Defaults { + private static final String MODE = "disabled"; + private static final List DEFAULT_URI_LIST_SETTING = Arrays.asList("_bulk", "_search"); + } + + /** + * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set + * rejection will be performed, otherwise only rejection metrics will be populated. + */ + public static final Setting ADMISSION_CONTROLLER_MODE = new Setting<>( + "admission_controller.mode", + Defaults.MODE, + AdmissionControllerMode::fromName, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * This setting provides the selectness of URI patterns which consumers can Match and + * filter the application of Admission controller + */ + public static final Setting> ADMISSION_CONTROLLER_URI_LIST_SETTING = Setting.listSetting( + "admission_controller.uri_list", + Defaults.DEFAULT_URI_LIST_SETTING, + Function.identity(), + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + private volatile AdmissionControllerMode admissionControllerMode; + + private final List defaultAdmissionControllerURIList; + + /** + * + * @param clusterService ClusterService Instance + * @param settings + */ + public AdmissionControllerSettings(ClusterService clusterService, Settings settings) { + ClusterSettings clusterSettings = clusterService.getClusterSettings(); + this.admissionControllerMode = ADMISSION_CONTROLLER_MODE.get(settings); + this.defaultAdmissionControllerURIList = ADMISSION_CONTROLLER_URI_LIST_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer(ADMISSION_CONTROLLER_MODE, this::setAdmissionControllerMode); + } + + /** + * + * @param admissionControllerMode update the mode of admission controller feature + */ + private void setAdmissionControllerMode(AdmissionControllerMode admissionControllerMode) { + this.admissionControllerMode = admissionControllerMode; + } + + /** + * + * @return boolean based on the admission controller feature is enabled or not + */ + public Boolean isAdmissionControllerEnabled() { + return this.admissionControllerMode != AdmissionControllerMode.DISABLED; + } + + /** + * + * @return boolean based on the admission controller feature is enforced or not + */ + public Boolean isAdmissionControllerEnforced() { + return this.admissionControllerMode == AdmissionControllerMode.ENFORCED; + } + + /** + * + * @return list of the default URI admission controller feature is applied + */ + public List getDefaultUriList() { + return this.defaultAdmissionControllerURIList; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerState.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerState.java new file mode 100644 index 0000000000000..661bbbc3661bb --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerState.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import java.util.List; + +/** + * Defines the current state of the admission controller + */ +public class AdmissionControllerState { + private AdmissionControllerMode mode; + private List applyURIList; + private String controllerName; + + private long rejectionCount; + + /** + * + * @param controllerName Name of the admission controller + * @param admissionControllerMode Mode of the admission controller + * @param applyURIList URI list for which the admission controller need to be applied. + */ + public AdmissionControllerState(String controllerName, AdmissionControllerMode admissionControllerMode, List applyURIList) { + this.applyURIList = applyURIList; + this.mode = admissionControllerMode; + this.controllerName = controllerName; + this.rejectionCount = 0; + } + + /** + * + * @return mode of the admission-controller + */ + public AdmissionControllerMode getMode() { + return mode; + } + + /** + * @param mode Update the mode for admission controller + */ + public void setMode(AdmissionControllerMode mode) { + this.mode = mode; + } + + /** + * + * @return list of the URI where the admission controller needs to applied + */ + public List getApplyURIList() { + return applyURIList; + } + + /** + * + * @param applyURIList update the URI list where the admission controller needs to applied + */ + public void setApplyURIList(List applyURIList) { + this.applyURIList = applyURIList; + } + + /** + * + * @return name of the admission controller + */ + public String getControllerName() { + return controllerName; + } + + /** + * @param controllerName update the name of the admission controller + */ + private void setControllerName(String controllerName) { + this.controllerName = controllerName; + } + + /** + * @return rejection count of the admission controller + */ + public long getRejectionCount() { + return rejectionCount; + } + + /** + * + * @param count update the count of the rejectionCount + */ + public void incrementRejectionCount(long count) { + this.rejectionCount = this.rejectionCount + count; + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java new file mode 100644 index 0000000000000..16ea13c5f1cb0 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/AdmissionController.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; + +import org.opensearch.throttling.admissioncontroller.AdmissionControllerState; + +/** + * Interface for Admission Controller in OpenSearch, which aims to provide resource based request admission control. + * It provides methods for any tracking-object that can be incremented (such as memory size), + * and admission control can be applied if configured limit has been reached + */ +public interface AdmissionController { + + /** + * Return the current state of the admission controller + * @return + */ + AdmissionControllerState getAdmissionControllerState(); + + /** + * Increment the tracking-objects and apply the admission control if threshold is breached. + * Mostly applicable while acquiring the quota. + * @return boolean if admission controller able to acquire the requested quota. + */ + boolean acquire(); + + /** + * Decrement the tracking-objects and do not apply the admission control. + * Mostly applicable while remitting the quota. + * @return boolean if admission controller is released the acquired quota. + */ + boolean release(); + + /** + * @return name of the admission-controller + */ + String getName(); +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java new file mode 100644 index 0000000000000..9094b66d4ee35 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/IOBasedAdmissionController.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerState; + +/** + * Class for IO Based Admission Controller in OpenSearch, which aims to provide IO utilisation admission control. + * It provides methods to apply admission control if configured limit has been reached + */ +public class IOBasedAdmissionController implements AdmissionController { + private static final Logger LOGGER = LogManager.getLogger(IOBasedAdmissionController.class); + private final AdmissionControllerState admissionControllerState; + + /** + * + * @param admissionControllerState State of the admission controller + */ + public IOBasedAdmissionController(AdmissionControllerState admissionControllerState) { + this.admissionControllerState = admissionControllerState; + } + + /** + * @return admissionControllerState + */ + @Override + public AdmissionControllerState getAdmissionControllerState() { + return this.admissionControllerState; + } + + /** + * This function will take of applying admission controller based on IO usage + * @return Boolean if the admission controller is applied successfully + */ + @Override + public boolean acquire() { + return false; + } + + /** + * @return Boolean if the admission controller cleared the objects that acquired + */ + @Override + public boolean release() { + return false; + } + + /** + * @return name of the admission Controller + */ + @Override + public String getName() { + return this.admissionControllerState.getControllerName(); + } + +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java new file mode 100644 index 0000000000000..4b9c1b6f3864a --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/controllers/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.controllers; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/AdmissionControllerTransportHandler.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/AdmissionControllerTransportHandler.java new file mode 100644 index 0000000000000..5088d1b581f90 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/AdmissionControllerTransportHandler.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.handler; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.tasks.Task; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerService; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * AdmissionController Handler to intercept Transport Requests. + * @param Transport Request + */ +public class AdmissionControllerTransportHandler implements TransportRequestHandler { + + private final String action; + private final TransportRequestHandler actualHandler; + protected final Logger log = LogManager.getLogger(this.getClass()); + public AdmissionControllerService admissionControllerService; + + public AdmissionControllerTransportHandler(String action, TransportRequestHandler actualHandler) { + super(); + this.action = action; + this.actualHandler = actualHandler; + this.admissionControllerService = AdmissionControllerService.getInstance(); + } + + /** + * @param request Transport Request that landed on the node + * @param channel Transport channel allows to send a response to a request + * @param task Current task that is executing + * @throws Exception + */ + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + // intercept all the transport requests here and apply admission control + try { + this.admissionControllerService.applyAdmissionController(); + } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { + channel.sendResponse(openSearchRejectedExecutionException); + throw openSearchRejectedExecutionException; + } catch (final Exception e) { + throw e; + } + this.messageReceivedDecorate(request, actualHandler, channel, task); + } + + /** + * + * @param request Transport Request that landed on the node + * @param actualHandler is the next handler to intercept the request + * @param transportChannel Transport channel allows to send a response to a request + * @param task Current task that is executing + * @throws Exception + */ + protected void messageReceivedDecorate( + final T request, + final TransportRequestHandler actualHandler, + final TransportChannel transportChannel, + Task task + ) throws Exception { + actualHandler.messageReceived(request, transportChannel, task); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/package-info.java new file mode 100644 index 0000000000000..3ce13eef5f9d9 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/handler/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.handler; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java new file mode 100644 index 0000000000000..a6abeb832a38a --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportInterceptor.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportInterceptor.java new file mode 100644 index 0000000000000..ffdc9dab2231e --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/AdmissionControllerTransportInterceptor.java @@ -0,0 +1,34 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.transport; + +import org.opensearch.throttling.admissioncontroller.handler.AdmissionControllerTransportHandler; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * This class allows throttling to intercept requests on both the sender and the receiver side. + */ +public class AdmissionControllerTransportInterceptor implements TransportInterceptor { + + /** + * + * @return admissionController handler to intercept transport requests + */ + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + return new AdmissionControllerTransportHandler<>(action, actualHandler); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java new file mode 100644 index 0000000000000..733e9647b71e7 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/transport/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller.transport; diff --git a/server/src/main/java/org/opensearch/throttling/package-info.java b/server/src/main/java/org/opensearch/throttling/package-info.java new file mode 100644 index 0000000000000..85a43dc86789d --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling; diff --git a/settings.gradle b/settings.gradle index c04b5997d49b1..009c8b732fdd5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -153,3 +153,6 @@ if (extraProjects.exists()) { addSubProjects('', extraProjectDir) } } +include 'modules:throttling' +findProject(':modules:throttling')?.name = 'throttling' +