From babe2118a96bf6780a7ddd4aa7d82a022adab21e Mon Sep 17 00:00:00 2001 From: Ajay Kumar Movva Date: Sun, 13 Aug 2023 00:09:14 +0530 Subject: [PATCH] Admission Controller Module Rest and Transport Interceptor Commit Signed-off-by: Ajay Kumar Movva --- modules/throttling/build.gradle | 17 +++++ .../OpenSearchThrottlingModulePlugin.java | 44 +++++++++++++ .../AdmissionControllerRestHandler.java | 63 +++++++++++++++++++ .../plugin-metadata/plugin-security.policy | 39 ++++++++++++ ...pensearch.transport.Netty4HandlerExtension | 8 +++ .../netty4/Netty4HttpServerTransport.java | 31 ++++++++- .../transport/Netty4HandlerExtension.java | 20 ++++++ .../transport/Netty4ModulePlugin.java | 32 ++++++++-- .../opensearch/transport/NettySettings.java | 27 ++++++++ .../AdmissionControllerTransportHandler.java | 62 ++++++++++++++++++ ...missionControllerTransportInterceptor.java | 33 ++++++++++ .../admissioncontroller/package-info.java | 9 +++ .../opensearch/throttling/package-info.java | 9 +++ settings.gradle | 3 + 14 files changed, 390 insertions(+), 7 deletions(-) create mode 100644 modules/throttling/build.gradle create mode 100644 modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java create mode 100644 modules/throttling/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerRestHandler.java create mode 100644 modules/throttling/src/main/plugin-metadata/plugin-security.policy create mode 100644 modules/throttling/src/main/resources/META-INF/services/org.opensearch.transport.Netty4HandlerExtension create mode 100644 modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4HandlerExtension.java create mode 100644 modules/transport-netty4/src/main/java/org/opensearch/transport/NettySettings.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerTransportHandler.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerTransportInterceptor.java create mode 100644 server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java create mode 100644 server/src/main/java/org/opensearch/throttling/package-info.java diff --git a/modules/throttling/build.gradle b/modules/throttling/build.gradle new file mode 100644 index 0000000000000..cfb9291a110e6 --- /dev/null +++ b/modules/throttling/build.gradle @@ -0,0 +1,17 @@ +apply plugin: 'opensearch.java-rest-test' + +opensearchplugin { + description 'Plugin intercepting requests and throttle based on resource consumption' + classname 'org.opensearch.throttling.OpenSearchThrottlingModulePlugin' + extendedPlugins = ['transport-netty4'] +} + +dependencies { + api project(path: ':modules:reindex') + implementation project(path: ':modules:transport-netty4') + compileOnly project(':modules:transport-netty4') +} + +testClusters.all { + module ':modules:reindex' +} diff --git a/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java b/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java new file mode 100644 index 0000000000000..c24a7cd46e52e --- /dev/null +++ b/modules/throttling/src/main/java/org/opensearch/throttling/OpenSearchThrottlingModulePlugin.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling; + +import io.netty.channel.ChannelHandler; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerRestHandler; +import org.opensearch.throttling.admissioncontroller.AdmissionControllerTransportInterceptor; +import org.opensearch.transport.Netty4HandlerExtension; +import org.opensearch.transport.TransportInterceptor; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class OpenSearchThrottlingModulePlugin extends Plugin implements NetworkPlugin, Netty4HandlerExtension { + + private static final Map HANDLERS = new HashMap(); + + @Override + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { + List interceptors = new ArrayList<>(0); + interceptors.add(new AdmissionControllerTransportInterceptor(null)); + return interceptors; + } + + @Override + public Map getHandlers() { + if (HANDLERS.isEmpty()) { + HANDLERS.put("opensearch-throttling-plugin:AdmissionControlRestHandler", new AdmissionControllerRestHandler()); + } + return HANDLERS; + } +} diff --git a/modules/throttling/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerRestHandler.java b/modules/throttling/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerRestHandler.java new file mode 100644 index 0000000000000..2a7d95aea0ba4 --- /dev/null +++ b/modules/throttling/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerRestHandler.java @@ -0,0 +1,63 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaderNames; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +@ChannelHandler.Sharable +public class AdmissionControllerRestHandler extends ChannelDuplexHandler { + private static final Logger LOGGER = LogManager.getLogger(AdmissionControllerRestHandler.class); + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + assert msg instanceof FullHttpRequest : "Invalid message type: " + msg.getClass(); + FullHttpRequest fullHttpRequest = (FullHttpRequest) msg; + String uri = getUri(fullHttpRequest); + applyAdmissionControl(uri); + ctx.fireChannelRead(msg); + } + + private String getUri(FullHttpRequest fullHttpRequest) { + return fullHttpRequest.uri(); + } + + private void applyAdmissionControl(String requestURI) { + // apply admission controller + // LOGGER.info("Apply Admission Controller Triggered URI: " + requestURI); + } + + private void releaseAdmissionControl(ChannelHandlerContext ctx) { + // release the acquired objects + // LOGGER.info("Released Admission Controller Handler"); + } + + private long getContentLength(FullHttpRequest fullHttpRequest) { + String contentLengthHeader = fullHttpRequest.headers().get(HttpHeaderNames.CONTENT_LENGTH); + return contentLengthHeader == null ? 0 : Long.parseLong(contentLengthHeader); + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + releaseAdmissionControl(ctx); + super.close(ctx, promise); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + releaseAdmissionControl(ctx); + super.write(ctx, msg, promise); + } +} diff --git a/modules/throttling/src/main/plugin-metadata/plugin-security.policy b/modules/throttling/src/main/plugin-metadata/plugin-security.policy new file mode 100644 index 0000000000000..ccfd6ba70dd16 --- /dev/null +++ b/modules/throttling/src/main/plugin-metadata/plugin-security.policy @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +grant { + // needed to generate runtime classes + permission java.lang.RuntimePermission "createClassLoader"; + + // needed to find the classloader to load allowlisted classes from + permission java.lang.RuntimePermission "getClassLoader"; +}; diff --git a/modules/throttling/src/main/resources/META-INF/services/org.opensearch.transport.Netty4HandlerExtension b/modules/throttling/src/main/resources/META-INF/services/org.opensearch.transport.Netty4HandlerExtension new file mode 100644 index 0000000000000..c834c3dc37b6f --- /dev/null +++ b/modules/throttling/src/main/resources/META-INF/services/org.opensearch.transport.Netty4HandlerExtension @@ -0,0 +1,8 @@ +# +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +# +org.opensearch.throttling.OpenSearchThrottlingModulePlugin diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java index 998c89590c53c..985794148397d 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java @@ -93,12 +93,17 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.NettyAllocator; import org.opensearch.transport.NettyByteBufSizer; +import org.opensearch.transport.NettySettings; import org.opensearch.transport.SharedGroupFactory; import org.opensearch.transport.netty4.Netty4Utils; import java.net.InetSocketAddress; import java.net.SocketOption; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE; import static org.opensearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH; @@ -182,6 +187,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private volatile ServerBootstrap serverBootstrap; private volatile SharedGroupFactory.SharedGroup sharedGroup; + private List channelHandlers; public Netty4HttpServerTransport( Settings settings, @@ -223,6 +229,25 @@ public Netty4HttpServerTransport( ); } + public Netty4HttpServerTransport( + Settings settings, + NetworkService networkService, + BigArrays bigArrays, + ThreadPool threadPool, + NamedXContentRegistry xContentRegistry, + Dispatcher dispatcher, + ClusterSettings clusterSettings, + SharedGroupFactory sharedGroupFactory, + Map channelHandlers + ) { + this(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher, clusterSettings, sharedGroupFactory); + this.channelHandlers = NettySettings.HANDLER_ORDERING.get(settings) + .stream() + .map(channelHandlers::get) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + } + public Settings settings() { return this.settings; } @@ -418,7 +443,6 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E final ChannelPipeline pipeline = ctx.pipeline(); pipeline.addAfter(ctx.name(), "handler", getRequestHandler()); pipeline.replace(this, "decoder_compress", new HttpContentDecompressor()); - pipeline.addAfter("decoder_compress", "aggregator", aggregator); if (handlingSettings.isCompression()) { pipeline.addAfter( @@ -430,7 +454,9 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E pipeline.addBefore("handler", "request_creator", requestCreator); pipeline.addBefore("handler", "response_creator", responseCreator); pipeline.addBefore("handler", "pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents)); - + transport.channelHandlers.forEach( + handler -> ch.pipeline().addBefore("request_creator", handler.getClass().getSimpleName(), handler) + ); ctx.fireChannelRead(ReferenceCountUtil.retain(msg)); } }); @@ -497,7 +523,6 @@ protected void initChannel(Channel childChannel) throws Exception { childChannel.pipeline() .addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel())); } - childChannel.pipeline() .addLast("aggregator", aggregator) .addLast("request_creator", requestCreator) diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4HandlerExtension.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4HandlerExtension.java new file mode 100644 index 0000000000000..0b0648363ddb9 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4HandlerExtension.java @@ -0,0 +1,20 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +import io.netty.channel.ChannelHandler; + +import java.util.Collections; +import java.util.Map; + +public interface Netty4HandlerExtension { + default Map getHandlers() { + return Collections.emptyMap(); + } +} diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java index ef60797bca067..f4f247db35b86 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/Netty4ModulePlugin.java @@ -32,6 +32,7 @@ package org.opensearch.transport; +import io.netty.channel.ChannelHandler; import org.opensearch.Version; import org.opensearch.common.SetOnce; import org.opensearch.core.common.io.stream.NamedWriteableRegistry; @@ -46,23 +47,43 @@ import org.opensearch.http.HttpServerTransport; import org.opensearch.http.netty4.Netty4HttpServerTransport; import org.opensearch.core.indices.breaker.CircuitBreakerService; +import org.opensearch.plugins.ExtensiblePlugin; import org.opensearch.plugins.NetworkPlugin; import org.opensearch.plugins.Plugin; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.netty4.Netty4Transport; -import java.util.Arrays; -import java.util.Collections; +import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import java.util.Arrays; import java.util.Map; +import java.util.Collections; +import java.util.HashMap; import java.util.function.Supplier; -public class Netty4ModulePlugin extends Plugin implements NetworkPlugin { +public class Netty4ModulePlugin extends Plugin implements NetworkPlugin, ExtensiblePlugin { public static final String NETTY_TRANSPORT_NAME = "netty4"; public static final String NETTY_HTTP_TRANSPORT_NAME = "netty4"; private final SetOnce groupFactory = new SetOnce<>(); + private static final List EXTENSIONS = new ArrayList<>(); + + @Override + public void loadExtensions(ExtensionLoader loader) { + Set uniqueNames = new HashSet<>(); + for (Netty4HandlerExtension extension : loader.loadExtensions(Netty4HandlerExtension.class)) { + String name = extension.getClass().getName(); + if (uniqueNames.contains(name)) { + continue; + } + Netty4ModulePlugin.EXTENSIONS.add(extension); + uniqueNames.add(name); + } + assert 1 == Netty4ModulePlugin.EXTENSIONS.size() : "More than 1 extensions are not supported"; + } @Override public List> getSettings() { @@ -124,6 +145,8 @@ public Map> getHttpTransports( HttpServerTransport.Dispatcher dispatcher, ClusterSettings clusterSettings ) { + Map channelHandlers = new HashMap<>(); + Netty4ModulePlugin.EXTENSIONS.forEach(extension -> channelHandlers.putAll(extension.getHandlers())); return Collections.singletonMap( NETTY_HTTP_TRANSPORT_NAME, () -> new Netty4HttpServerTransport( @@ -134,7 +157,8 @@ public Map> getHttpTransports( xContentRegistry, dispatcher, clusterSettings, - getSharedGroupFactory(settings) + getSharedGroupFactory(settings), + channelHandlers ) ); } diff --git a/modules/transport-netty4/src/main/java/org/opensearch/transport/NettySettings.java b/modules/transport-netty4/src/main/java/org/opensearch/transport/NettySettings.java new file mode 100644 index 0000000000000..02772842fcf72 --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/opensearch/transport/NettySettings.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.transport; + +import org.opensearch.common.settings.Setting; + +import java.util.Arrays; +import java.util.List; +import java.util.function.Function; + +public final class NettySettings { + private NettySettings() {} + + // TODO: Evaluate which one is better, Moving this to yml(AMI Config) or keeping it here. + public static final Setting> HANDLER_ORDERING = Setting.listSetting( + "opensearch.netty.plugin.handler.ordering", + Arrays.asList("opensearch-throttling-plugin:AdmissionControlRestHandler"), + Function.identity(), + Setting.Property.NodeScope + ); +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerTransportHandler.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerTransportHandler.java new file mode 100644 index 0000000000000..9830b7b378c5c --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerTransportHandler.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.apache.logging.log4j.LogManager; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; +import org.apache.logging.log4j.Logger; + +public class AdmissionControllerTransportHandler implements TransportRequestHandler { + + private final String action; + private final TransportRequestHandler actualHandler; + private final ThreadPool threadPool; + protected final Logger log = LogManager.getLogger(this.getClass()); + + public AdmissionControllerTransportHandler(String action, TransportRequestHandler actualHandler, ThreadPool threadPool) { + super(); + this.action = action; + this.actualHandler = actualHandler; + this.threadPool = threadPool; + } + + protected ThreadContext getThreadContext() { + if (threadPool == null) { + return null; + } + return threadPool.getThreadContext(); + } + + /** + * @param request + * @param channel + * @param task + * @throws Exception + */ + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + // intercept all the transport requests here and apply admission control + // log.info("Action:" + this.action); + this.messageReceivedDecorate(request, actualHandler, channel, task); + } + + protected void messageReceivedDecorate( + final T request, + final TransportRequestHandler actualHandler, + final TransportChannel transportChannel, + Task task + ) throws Exception { + actualHandler.messageReceived(request, transportChannel, task); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerTransportInterceptor.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerTransportInterceptor.java new file mode 100644 index 0000000000000..6a31e7b49e4a5 --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/AdmissionControllerTransportInterceptor.java @@ -0,0 +1,33 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; + +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +public class AdmissionControllerTransportInterceptor implements TransportInterceptor { + + protected final ThreadPool threadPool; + + public AdmissionControllerTransportInterceptor(final ThreadPool threadPool) { + this.threadPool = threadPool; + } + + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler + ) { + return new AdmissionControllerTransportHandler<>(action, actualHandler, threadPool); + } +} diff --git a/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java b/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java new file mode 100644 index 0000000000000..a6abeb832a38a --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/admissioncontroller/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling.admissioncontroller; diff --git a/server/src/main/java/org/opensearch/throttling/package-info.java b/server/src/main/java/org/opensearch/throttling/package-info.java new file mode 100644 index 0000000000000..85a43dc86789d --- /dev/null +++ b/server/src/main/java/org/opensearch/throttling/package-info.java @@ -0,0 +1,9 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.throttling; diff --git a/settings.gradle b/settings.gradle index c04b5997d49b1..009c8b732fdd5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -153,3 +153,6 @@ if (extraProjects.exists()) { addSubProjects('', extraProjectDir) } } +include 'modules:throttling' +findProject(':modules:throttling')?.name = 'throttling' +