Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added changes for AdmissionControl Interceptor and AdmissionControlSe… #11965

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [S3 Repository] Add setting to control connection count for sync client ([#12028](https://github.com/opensearch-project/OpenSearch/pull/12028))
- Add support for Google Application Default Credentials in repository-gcs ([#8394](https://github.com/opensearch-project/OpenSearch/pull/8394))
- New DateTime format for RFC3339 compatible date fields ([#11465](https://github.com/opensearch-project/OpenSearch/pull/11465))
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))

### Dependencies
- Bumps jetty version to 9.4.52.v20230823 to fix GMS-2023-1857 ([#9822](https://github.com/opensearch-project/OpenSearch/pull/9822))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ public abstract class TransportReplicationAction<
Setting.Property.NodeScope
);

/**
* Making primary and replica actions suffixes as constant
*/
public static final String PRIMARY_ACTION_SUFFIX = "[p]";
public static final String REPLICA_ACTION_SUFFIX = "[r]";

protected final ThreadPool threadPool;
protected final TransportService transportService;
protected final ClusterService clusterService;
Expand Down Expand Up @@ -204,8 +210,8 @@ protected TransportReplicationAction(
this.shardStateAction = shardStateAction;
this.executor = executor;

this.transportPrimaryAction = actionName + "[p]";
this.transportReplicaAction = actionName + "[r]";
this.transportPrimaryAction = actionName + PRIMARY_ACTION_SUFFIX;
this.transportReplicaAction = actionName + REPLICA_ACTION_SUFFIX;

this.initialRetryBackoffBound = REPLICATION_INITIAL_RETRY_BACKOFF_BOUND.get(settings);
this.retryTimeout = REPLICATION_RETRY_TIMEOUT.get(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public final class NetworkModule {

private final Map<String, Supplier<Transport>> transportFactories = new HashMap<>();
private final Map<String, Supplier<HttpServerTransport>> transportHttpFactories = new HashMap<>();
private final List<TransportInterceptor> transportIntercetors = new ArrayList<>();
private final List<TransportInterceptor> transportInterceptors = new ArrayList<>();

/**
* Creates a network module that custom networking classes can be plugged into.
Expand All @@ -149,7 +149,8 @@ public NetworkModule(
NetworkService networkService,
HttpServerTransport.Dispatcher dispatcher,
ClusterSettings clusterSettings,
Tracer tracer
Tracer tracer,
List<TransportInterceptor> transportInterceptors
) {
this.settings = settings;
for (NetworkPlugin plugin : plugins) {
Expand Down Expand Up @@ -180,14 +181,18 @@ public NetworkModule(
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
registerTransport(entry.getKey(), entry.getValue());
}
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(
List<TransportInterceptor> pluginTransportInterceptors = plugin.getTransportInterceptors(
namedWriteableRegistry,
threadPool.getThreadContext()
);
for (TransportInterceptor interceptor : transportInterceptors) {
for (TransportInterceptor interceptor : pluginTransportInterceptors) {
registerTransportInterceptor(interceptor);
}
}
// Adding last because interceptors are triggered from last to first order from the list
if (transportInterceptors != null) {
transportInterceptors.forEach(this::registerTransportInterceptor);
}
}

/** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */
Expand Down Expand Up @@ -264,15 +269,15 @@ public Supplier<Transport> getTransportSupplier() {
* Registers a new {@link TransportInterceptor}
*/
private void registerTransportInterceptor(TransportInterceptor interceptor) {
this.transportIntercetors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
this.transportInterceptors.add(Objects.requireNonNull(interceptor, "interceptor must not be null"));
}

/**
* Returns a composite {@link TransportInterceptor} containing all registered interceptors
* @see #registerTransportInterceptor(TransportInterceptor)
*/
public TransportInterceptor getTransportInterceptor() {
return new CompositeTransportInterceptor(this.transportIntercetors);
return new CompositeTransportInterceptor(this.transportInterceptors);
}

static final class CompositeTransportInterceptor implements TransportInterceptor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@
import org.opensearch.persistent.PersistentTasksClusterService;
import org.opensearch.persistent.decider.EnableAssignmentDecider;
import org.opensearch.plugins.PluginsService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings;
import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings;
import org.opensearch.repositories.fs.FsRepository;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.script.ScriptService;
Expand Down Expand Up @@ -698,7 +700,11 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE,
CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT,
CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT
)
)
);
Expand Down
18 changes: 17 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.plugins.SystemIndexPlugin;
import org.opensearch.plugins.TelemetryPlugin;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.ratelimitting.admissioncontrol.transport.AdmissionControlTransportInterceptor;
import org.opensearch.repositories.RepositoriesModule;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.rest.RestController;
Expand Down Expand Up @@ -896,6 +898,17 @@ protected Node(

final RestController restController = actionModule.getRestController();

final AdmissionControlService admissionControlService = new AdmissionControlService(
settings,
clusterService.getClusterSettings(),
threadPool
);

AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor(
admissionControlService
);

List<TransportInterceptor> transportInterceptors = List.of(admissionControlTransportInterceptor);
final NetworkModule networkModule = new NetworkModule(
settings,
pluginsService.filterPlugins(NetworkPlugin.class),
Expand All @@ -908,8 +921,10 @@ protected Node(
networkService,
restController,
clusterService.getClusterSettings(),
tracer
tracer,
transportInterceptors
);

Collection<UnaryOperator<Map<String, IndexTemplateMetadata>>> indexTemplateMetadataUpgraders = pluginsService.filterPlugins(
Plugin.class
).stream().map(Plugin::getIndexTemplateMetadataUpgrader).collect(Collectors.toList());
Expand Down Expand Up @@ -1186,6 +1201,7 @@ protected Node(
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(AdmissionControlService.class).toInstance(admissionControlService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController;
import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController;
import org.opensearch.threadpool.ThreadPool;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER;

/**
* Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch.
*/
public class AdmissionControlService {
private final ThreadPool threadPool;
public final AdmissionControlSettings admissionControlSettings;
private final ConcurrentMap<String, AdmissionController> ADMISSION_CONTROLLERS;
private static final Logger logger = LogManager.getLogger(AdmissionControlService.class);
private final ClusterSettings clusterSettings;
private final Settings settings;

/**
*
* @param settings Immutable settings instance
* @param clusterSettings ClusterSettings Instance
* @param threadPool ThreadPool Instance
*/
public AdmissionControlService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
this.threadPool = threadPool;
this.admissionControlSettings = new AdmissionControlSettings(clusterSettings, settings);
this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>();
this.clusterSettings = clusterSettings;
this.settings = settings;
this.initialise();
}

/**
* Initialise and Register all the admissionControllers
*/
private void initialise() {
// Initialise different type of admission controllers
registerAdmissionController(CPU_BASED_ADMISSION_CONTROLLER);
}

/**
* Handler to trigger registered admissionController
*/
public void applyTransportAdmissionControl(String action) {
this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action); });
}

/**
*
* @param admissionControllerName admissionControllerName to register into the service.
*/
public void registerAdmissionController(String admissionControllerName) {
AdmissionController admissionController = this.controllerFactory(admissionControllerName);
this.ADMISSION_CONTROLLERS.put(admissionControllerName, admissionController);
}

/**
* @return AdmissionController Instance
*/
private AdmissionController controllerFactory(String admissionControllerName) {
switch (admissionControllerName) {
case CPU_BASED_ADMISSION_CONTROLLER:
return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterSettings);
default:
throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName);
}
}

/**
*
* @return list of the registered admissionControllers
*/
public List<AdmissionController> getAdmissionControllers() {
return new ArrayList<>(this.ADMISSION_CONTROLLERS.values());
}

/**
*
* @param controllerName name of the admissionController
* @return instance of the AdmissionController Instance
*/
public AdmissionController getAdmissionController(String controllerName) {
return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.ratelimitting.admissioncontrol;

import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode;

/**
* Settings related to admission control.
* @opensearch.internal
*/
public final class AdmissionControlSettings {

/**
* Default parameters for the AdmissionControlSettings
*/
public static class Defaults {

Check warning on line 25 in server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettings.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSettings.java#L25

Added line #L25 was not covered by tests
public static final String MODE = "disabled";
}

/**
* Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set
* rejection will be performed, otherwise only rejection metrics will be populated.
*/
public static final Setting<AdmissionControlMode> ADMISSION_CONTROL_TRANSPORT_LAYER_MODE = new Setting<>(
"admission_control.transport.mode",
Defaults.MODE,
AdmissionControlMode::fromName,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private volatile AdmissionControlMode transportLayeradmissionControlMode;

/**
* @param clusterSettings clusterSettings Instance
* @param settings settings instance
*/
public AdmissionControlSettings(ClusterSettings clusterSettings, Settings settings) {
this.transportLayeradmissionControlMode = ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.get(settings);
clusterSettings.addSettingsUpdateConsumer(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE, this::setAdmissionControlTransportLayerMode);
}

/**
*
* @param admissionControlMode update the mode of admission control feature
*/
private void setAdmissionControlTransportLayerMode(AdmissionControlMode admissionControlMode) {
this.transportLayeradmissionControlMode = admissionControlMode;
}

/**
*
* @return return the default mode of the admissionControl
*/
public AdmissionControlMode getAdmissionControlTransportLayerMode() {
return this.transportLayeradmissionControlMode;
}

/**
*
* @return true based on the admission control feature is enforced else false
*/
public Boolean isTransportLayerAdmissionControlEnforced() {
return this.transportLayeradmissionControlMode == AdmissionControlMode.ENFORCED;
}

/**
*
* @return true based on the admission control feature is enabled else false
*/
public Boolean isTransportLayerAdmissionControlEnabled() {
return this.transportLayeradmissionControlMode != AdmissionControlMode.DISABLED;
}
}
Loading
Loading