Skip to content

Commit

Permalink
Admission Controller Module Transport Interceptor Initial Commit
Browse files Browse the repository at this point in the history
Signed-off-by: Ajay Kumar Movva <[email protected]>
  • Loading branch information
Ajay Kumar Movva committed Sep 3, 2023
1 parent e98ded6 commit f60c6b5
Show file tree
Hide file tree
Showing 29 changed files with 1,736 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107))
- Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679))
- Added encryption-sdk lib to provide encryption and decryption capabilities ([#8466](https://github.com/opensearch-project/OpenSearch/pull/8466))
- Admission Controller Module Transport Interceptor Initial Commit ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
14 changes: 14 additions & 0 deletions modules/throttling/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
apply plugin: 'opensearch.java-rest-test'

opensearchplugin {
description 'Plugin intercepting requests and throttle based on resource consumption'
classname 'org.opensearch.throttling.OpenSearchThrottlingModulePlugin'
}

dependencies {
api project(path: ':modules:reindex')
}

testClusters.all {
module ':modules:reindex'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling;

import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerService;
import org.opensearch.throttling.admissioncontroller.transport.AdmissionControllerTransportInterceptor;
import org.opensearch.transport.TransportInterceptor;

import java.util.ArrayList;
import java.util.List;

/**
* This plugin is used to register handlers to intercept both rest and transport requests.
*/
public class OpenSearchThrottlingModulePlugin extends Plugin implements NetworkPlugin {

AdmissionControllerService admissionControllerService;

/**
* Default Constructor for plugin
*/
public OpenSearchThrottlingModulePlugin() {}

/**
* Returns a list of {@link TransportInterceptor} instances that are used to intercept incoming and outgoing
* transport (inter-node) requests. This must not return <code>null</code>
*
* @param namedWriteableRegistry registry of all named writeables registered
* @param threadContext a {@link ThreadContext} of the current nodes or clients that can be used to set additional
* headers in the interceptors
* @return list of transport interceptors
*/
@Override
public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
this.admissionControllerService = AdmissionControllerService.getInstance();
List<TransportInterceptor> interceptors = new ArrayList<>(0);
// TODO Will throw exception in next PR's. This needs to ensure the service is up before adding into transport interceptor.
if (this.admissionControllerService != null) {
interceptors.add(new AdmissionControllerTransportInterceptor(this.admissionControllerService));
}
return interceptors;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/**
* Package contains classes related to throttling plugins
*/
package org.opensearch.throttling;
39 changes: 39 additions & 0 deletions modules/throttling/src/main/plugin-metadata/plugin-security.policy
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

grant {
// needed to generate runtime classes
permission java.lang.RuntimePermission "createClassLoader";

// needed to find the classloader to load allowlisted classes from
permission java.lang.RuntimePermission "getClassLoader";
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.throttling;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerService;
import org.opensearch.throttling.admissioncontroller.transport.AdmissionControllerTransportInterceptor;
import org.opensearch.transport.TransportInterceptor;

import java.util.List;

import static org.mockito.Mockito.mock;

public class OpenSearchThrottlingModulePluginTests extends OpenSearchTestCase {
OpenSearchThrottlingModulePlugin openSearchThrottlingModulePlugin;
private ClusterService clusterService;
private ThreadPool threadPool;

@Override
public void setUp() throws Exception {
threadPool = new TestThreadPool("admission_controller_settings_test");
clusterService = new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
threadPool
);
super.setUp();
}

@Override
public void tearDown() throws Exception {
super.tearDown();
threadPool.shutdownNow();
}

public void testGetTransportInterceptors() {
openSearchThrottlingModulePlugin = new OpenSearchThrottlingModulePlugin();
List<TransportInterceptor> interceptors = openSearchThrottlingModulePlugin.getTransportInterceptors(
mock(NamedWriteableRegistry.class),
threadPool.getThreadContext()
);
assertEquals(interceptors.size(), 0);
AdmissionControllerService admissionControllerService = AdmissionControllerService.newAdmissionControllerService(
Settings.EMPTY,
clusterService.getClusterSettings(),
threadPool
);
interceptors = openSearchThrottlingModulePlugin.getTransportInterceptors(
mock(NamedWriteableRegistry.class),
threadPool.getThreadContext()
);
assertEquals(interceptors.size(), 1);
assertEquals(interceptors.get(0).getClass(), AdmissionControllerTransportInterceptor.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerSettings;
import org.opensearch.throttling.admissioncontroller.settings.IOBasedAdmissionControllerSettings;
import org.opensearch.transport.ProxyConnectionStrategy;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.RemoteConnectionStrategy;
Expand Down Expand Up @@ -237,6 +239,10 @@ public void apply(Settings value, Settings current, Settings previous) {
public static Set<Setting<?>> BUILT_IN_CLUSTER_SETTINGS = Collections.unmodifiableSet(
new HashSet<>(
Arrays.asList(
AdmissionControllerSettings.ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
AdmissionControllerSettings.ADMISSION_CONTROLLER_FORCE_ENABLE_DEFAULT_SETTING,
IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE,
IOBasedAdmissionControllerSettings.IO_BASED_ADMISSION_CONTROLLER_TRANSPORT_URI_LIST,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING,
AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING,
AwarenessReplicaBalance.CLUSTER_ROUTING_ALLOCATION_AWARENESS_BALANCE_SETTING,
Expand Down
8 changes: 8 additions & 0 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.throttling.admissioncontroller.AdmissionControllerService;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.Transport;
import org.opensearch.transport.TransportInterceptor;
Expand Down Expand Up @@ -749,6 +750,12 @@ protected Node(
fileCacheCleaner
);

final AdmissionControllerService admissionControllerService = AdmissionControllerService.newAdmissionControllerService(
settings,
clusterService.getClusterSettings(),
threadPool
);

final AliasValidator aliasValidator = new AliasValidator();

final ShardLimitValidator shardLimitValidator = new ShardLimitValidator(settings, clusterService, systemIndices);
Expand Down Expand Up @@ -1094,6 +1101,7 @@ protected Node(
b.bind(IndexingPressureService.class).toInstance(indexingPressureService);
b.bind(TaskResourceTrackingService.class).toInstance(taskResourceTrackingService);
b.bind(SearchBackpressureService.class).toInstance(searchBackpressureService);
b.bind(AdmissionControllerService.class).toInstance(admissionControllerService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(AggregationUsageService.class).toInstance(searchModule.getValuesSourceRegistry().getUsageService());
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
Expand Down
Loading

0 comments on commit f60c6b5

Please sign in to comment.