Skip to content

Commit

Permalink
Admission Controller Module Transport Interceptor Initial Commit
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <[email protected]>
  • Loading branch information
Ajay Kumar Movva committed Aug 30, 2023
1 parent 784a473 commit 9a65c7c
Show file tree
Hide file tree
Showing 25 changed files with 984 additions and 6 deletions.
14 changes: 14 additions & 0 deletions modules/throttling/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apply plugin: 'opensearch.java-rest-test'

opensearchplugin {
description 'Plugin intercepting requests and throttle based on resource consumption'
classname 'org.opensearch.throttling.OpenSearchThrottlingModulePlugin'
}

dependencies {
api project(path: ':modules:reindex')
}

testClusters.all {
module ':modules:reindex'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling;

import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerService;
import org.opensearch.throttling.admissioncontroller.transport.AdmissionControllerTransportInterceptor;
import org.opensearch.transport.TransportInterceptor;

import java.util.ArrayList;
import java.util.List;

/**
* This plugin is used to register handlers to intercept both rest and transport requests.
*/
public class OpenSearchThrottlingModulePlugin extends Plugin implements NetworkPlugin {

AdmissionControllerService admissionControllerService;

/**
* Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing
* transport (inter-node) requests. This must not return <code>null</code>
*
* @param namedWriteableRegistry registry of all named writeables registered
* @param threadContext a {@link ThreadContext} of the current nodes or clients that can be used to set additional
* headers in the interceptors
* @param admissionControllerService Services that allowed during the transport interception of requests.
* @return
*/
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext, AdmissionControllerService admissionControllerService) {
this.admissionControllerService = admissionControllerService;
List<TransportInterceptor> interceptors = new ArrayList<>(0);
interceptors.add(new AdmissionControllerTransportInterceptor(this.admissionControllerService));
return interceptors;
}
}
39 changes: 39 additions & 0 deletions modules/throttling/src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

grant {
// needed to generate runtime classes
permission java.lang.RuntimePermission "createClassLoader";

// needed to find the classloader to load allowlisted classes from
permission java.lang.RuntimePermission "getClassLoader";
};
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,8 @@
import org.opensearch.tasks.RawTaskStatus;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportInterceptor;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerService;
import org.opensearch.transport.*;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -143,6 +141,7 @@ public NetworkModule(
BigArrays bigArrays,
PageCacheRecycler pageCacheRecycler,
CircuitBreakerService circuitBreakerService,
AdmissionControllerService admissionControllerService,
NamedWriteableRegistry namedWriteableRegistry,
NamedXContentRegistry xContentRegistry,
NetworkService networkService,
Expand Down Expand Up @@ -178,8 +177,13 @@ public NetworkModule(
}
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(
namedWriteableRegistry,
threadPool.getThreadContext()
threadPool.getThreadContext(),
admissionControllerService
);
transportInterceptors.addAll(plugin.getTransportInterceptors(
namedWriteableRegistry,
threadPool.getThreadContext()
));
for (TransportInterceptor interceptor : transportInterceptors) {
registerTransportInterceptor(interceptor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings;
import org.opensearch.throttling.admissioncontroller.controllers.IOBasedAdmissionController;
import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings;
import org.opensearch.transport.ProxyConnectionStrategy;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.RemoteConnectionStrategy;
Expand Down Expand Up @@ -233,6 +236,10 @@ public void apply(Settings value, Settings current, Settings previous) {
public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_URI_LIST_SETTING,
AdmissionControllerSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING,
IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_MODE,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ public Builder copy(String key, String sourceKey, Settings source) {
/**
* Sets a null value for the given setting key
*/
public Builder putNull(String key) {
public Builder putNull(String key) {
return put(key, (String) null);
}

Expand Down
13 changes: 13 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerService;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportInterceptor;
Expand Down Expand Up @@ -749,6 +750,12 @@ protected Node(
fileCacheCleaner
);

final AdmissionControllerService admissionControllerService = new AdmissionControllerService(
settings,
clusterService.getClusterSettings(),
threadPool
);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down Expand Up @@ -827,6 +834,7 @@ protected Node(
bigArrays,
pageCacheRecycler,
circuitBreakerService,
admissionControllerService,
namedWriteableRegistry,
xContentRegistry,
networkService,
Expand Down Expand Up @@ -1094,6 +1102,7 @@ protected Node(
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(AdmissionControllerService.class).toInstance(admissionControllerService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Expand Down Expand Up @@ -1263,6 +1272,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(RepositoriesService.class).start();
injector.getInstance(SearchService.class).start();
injector.getInstance(FsHealthService.class).start();
injector.getInstance(AdmissionControllerService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();
Expand Down Expand Up @@ -1418,6 +1428,7 @@ private Node stop() {
injector.getInstance(ClusterService.class).stop();
injector.getInstance(NodeConnectionsService.class).stop();
injector.getInstance(FsHealthService.class).stop();
injector.getInstance(AdmissionControllerService.class).stop();
nodeService.getMonitorService().stop();
nodeService.getSearchBackpressureService().stop();
injector.getInstance(GatewayService.class).stop();
Expand Down Expand Up @@ -1481,6 +1492,8 @@ public synchronized void close() throws IOException {
toClose.add(nodeService.getSearchBackpressureService());
toClose.add(() -> stopWatch.stop().start("fsHealth"));
toClose.add(injector.getInstance(FsHealthService.class));
toClose.add(() -> stopWatch.stop().start("admission_controller"));
toClose.add(injector.getInstance(AdmissionControllerService.class));
toClose.add(() -> stopWatch.stop().start("gateway"));
toClose.add(injector.getInstance(GatewayService.class));
toClose.add(() -> stopWatch.stop().start("search"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.http.HttpServerTransport;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportInterceptor;

Expand Down Expand Up @@ -72,6 +73,14 @@ default List<TransportInterceptor> getTransportInterceptors(
return Collections.emptyList();
}

default List<TransportInterceptor> getTransportInterceptors(
NamedWriteableRegistry namedWriteableRegistry,
ThreadContext threadContext,
AdmissionControllerService admissionControllerService
) {
return Collections.emptyList();
}

/**
* Returns a map of {@link Transport} suppliers.
* See {@link org.opensearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.
Expand Down
Loading

0 comments on commit 9a65c7c

Please sign in to comment.