Skip to content

Commit

Permalink
Addressing Comments
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <[email protected]>
  • Loading branch information
Ajay Kumar Movva committed Oct 19, 2023
1 parent fbf0934 commit 01316da
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,13 @@ public NetworkModule(
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings,
Tracer tracer
Tracer tracer,
List<TransportInterceptor> coreTransportInterceptors
) {
this.settings = settings;
if (coreTransportInterceptors != null) {
coreTransportInterceptors.forEach(this::registerTransportInterceptor);
}
for (NetworkPlugin plugin : plugins) {
Map<String, Supplier<HttpServerTransport>> httpTransportFactory = plugin.getHttpTransports(
settings,
Expand Down Expand Up @@ -267,14 +271,6 @@ private void registerTransportInterceptor(TransportInterceptor interceptor) {
this.transportInterceptors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
}

/**
* Registers a new {@link TransportInterceptor}
* This method used to register CoreInterceptors before the plugin interceptors
*/
public void registerCoreTransportInterceptor(TransportInterceptor interceptor) {
this.transportInterceptors.add(0, Objects.requireNonNull(interceptor, "interceptor must not be null"));
}

/**
* Returns a composite {@link TransportInterceptor} containing all registered interceptors
* @see #registerTransportInterceptor(TransportInterceptor)
Expand Down
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ protected Node(

final RestController restController = actionModule.getRestController();

List<TransportInterceptor> coreTransportInterceptors = new ArrayList<>();
final AdmissionControlService admissionControlService = new AdmissionControlService(
settings,
clusterService.getClusterSettings(),
Expand All @@ -903,6 +904,7 @@ protected Node(
admissionControlService
);

coreTransportInterceptors.add(admissionControlTransportInterceptor);
final NetworkModule networkModule = new NetworkModule(
settings,
pluginsService.filterPlugins(NetworkPlugin.class),
Expand All @@ -915,9 +917,9 @@ protected Node(
networkService,
restController,
clusterService.getClusterSettings(),
tracer
tracer,
coreTransportInterceptors
);
networkModule.registerCoreTransportInterceptor(admissionControlTransportInterceptor);

Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders = pluginsService.filterPlugins(
Plugin.class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public AdmissionControlService(Settings settings, ClusterSettings clusterSetting
/**
* Initialise and Register all the admissionControllers
*/
public void initialise() {
private void initialise() {
// Initialise different type of admission controllers
registerAdmissionController(CPU_BASED_ADMISSION_CONTROLLER);
}
Expand All @@ -69,14 +69,14 @@ public void applyTransportAdmissionControl(String action) {
* @param admissionControllerName admissionControllerName to register into the service.
*/
public void registerAdmissionController(String admissionControllerName) {
AdmissionController admissionController = this.getControllerImplementation(admissionControllerName);
AdmissionController admissionController = this.controllerFactory(admissionControllerName);
this.ADMISSION_CONTROLLERS.put(admissionControllerName, admissionController);
}

/**
* @return AdmissionController Instance
*/
private AdmissionController getControllerImplementation(String admissionControllerName) {
private AdmissionController controllerFactory(String admissionControllerName) {
switch (admissionControllerName) {
case CPU_BASED_ADMISSION_CONTROLLER:
return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterSettings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,63 @@

package org.opensearch.ratelimitting.admissioncontrol.controllers;

import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;

import java.util.concurrent.atomic.AtomicLong;

/**
* Interface for Admission Controller in OpenSearch, which aims to provide resource based request admission control.
* Abstract class for Admission Controller in OpenSearch, which aims to provide resource based request admission control.
* It provides methods for any tracking-object that can be incremented (such as memory size),
* and admission control can be applied if configured limit has been reached
*/
public interface AdmissionController {
public abstract class AdmissionController {

private final AtomicLong rejectionCount;
private final String admissionControllerName;

/**
*
* @param rejectionCount initialised rejectionCount value for AdmissionController
* @param admissionControllerName name of the admissionController
*/
public AdmissionController(AtomicLong rejectionCount, String admissionControllerName) {
this.rejectionCount = rejectionCount;
this.admissionControllerName = admissionControllerName;
}

/**
* Return the current state of the admission controller
* @return true if admissionController is enabled for the transport layer else false
*/
boolean isEnabledForTransportLayer();
public boolean isEnabledForTransportLayer(AdmissionControlMode admissionControlMode) {
return admissionControlMode != AdmissionControlMode.DISABLED;
}

/**
* Increment the tracking-objects and apply the admission control if threshold is breached.
* Mostly applicable while applying admission controller
*/
void apply(String action);
public abstract void apply(String action);

/**
* @return name of the admission-controller
*/
String getName();
public String getName() {
return this.admissionControllerName;
}

/**
* Adds the rejection count for the controller. Primarily used when copying controller states.
* @param count To add the value of the tracking resource object as the provided count
*/
void addRejectionCount(long count);
public void addRejectionCount(long count) {
this.rejectionCount.addAndGet(count);
}

/**
* @return current value of the rejection count metric tracked by the admission-controller.
*/
long getRejectionCount();
public long getRejectionCount() {
return this.rejectionCount.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;

import java.util.concurrent.atomic.AtomicLong;
Expand All @@ -21,29 +20,17 @@
* Class for CPU Based Admission Controller in OpenSearch, which aims to provide CPU utilisation admission control.
* It provides methods to apply admission control if configured limit has been reached
*/
public class CPUBasedAdmissionController implements AdmissionController {
public class CPUBasedAdmissionController extends AdmissionController {
private static final Logger LOGGER = LogManager.getLogger(CPUBasedAdmissionController.class);
private final String admissionControllerName;
public CPUBasedAdmissionControllerSettings settings;
private final AtomicLong rejectionCount;

/**
*
* @param admissionControllerName State of the admission controller
*/
public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterSettings clusterSettings) {
this.admissionControllerName = admissionControllerName;
super(new AtomicLong(0), admissionControllerName);
this.settings = new CPUBasedAdmissionControllerSettings(clusterSettings, settings);
this.rejectionCount = new AtomicLong(0);
}

/**
*
* @return true if admissionController is enabled for the transport layer else false
*/
@Override
public boolean isEnabledForTransportLayer() {
return this.settings.getTransportLayerAdmissionControllerMode() != AdmissionControlMode.DISABLED;
}

/**
Expand All @@ -53,40 +40,16 @@ public boolean isEnabledForTransportLayer() {
@Override
public void apply(String action) {
// TODO Will extend this logic further currently just incrementing rejectionCount
if (this.isEnabledForTransportLayer()) {
if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) {
this.applyForTransportLayer(action);
}
}

private void applyForTransportLayer(String actionName) {
// currently incrementing counts to evaluate the controller triggering as expected and using in testing
// currently incrementing counts to evaluate the controller triggering as expected and using in testing so limiting to 10
// TODO will update rejection logic further in next PR's
this.addRejectionCount(1);
}

/**
* @return name of the admission Controller
*/
@Override
public String getName() {
return this.admissionControllerName;
}

/**
* Adds the rejection count for the controller.
*
* @param count the value that needs to be added to total rejection count
*/
@Override
public void addRejectionCount(long count) {
this.rejectionCount.incrementAndGet();
}

/**
* @return current value of the rejection count metric tracked by the admission-controller.
*/
@Override
public long getRejectionCount() {
return this.rejectionCount.get();
if (this.getRejectionCount() < 10) {
this.addRejectionCount(1);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

/**
* Settings related to cpu based admission controller.
Expand Down Expand Up @@ -70,20 +69,13 @@ public static class Defaults {
Setting.Property.NodeScope
);

public static final Setting<List<String>> CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST = Setting.listSetting(
"admission_control.global_cpu_usage.actions_list",
Defaults.TRANSPORT_LAYER_DEFAULT_URI_TYPE,
Function.identity(),
Setting.Property.NodeScope
);

// currently limited to one setting will add further more settings in follow-up PR's
public CPUBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Settings settings) {
this.transportLayerMode = CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.get(settings);
clusterSettings.addSettingsUpdateConsumer(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode);
this.searchCPULimit = SEARCH_CPU_USAGE_LIMIT.get(settings);
this.indexingCPULimit = INDEXING_CPU_USAGE_LIMIT.get(settings);
this.transportActionsList = CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST.get(settings);
this.transportActionsList = Defaults.TRANSPORT_LAYER_DEFAULT_URI_TYPE;
clusterSettings.addSettingsUpdateConsumer(INDEXING_CPU_USAGE_LIMIT, this::setIndexingCPULimit);
clusterSettings.addSettingsUpdateConsumer(SEARCH_CPU_USAGE_LIMIT, this::setSearchCPULimit);
}
Expand Down
Loading

0 comments on commit 01316da

Please sign in to comment.