From d1d48b1743c37dc6d908d318d1556e97edfd8bc6 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Fri, 29 Sep 2023 04:45:28 -0400 Subject: [PATCH] Remove createDecompressor extension point in favor of attributeKey that can be populated by an extending transport Signed-off-by: Craig Perkins --- .../netty4/Netty4ConditionalDecompressor.java | 16 ++++++++++++++ .../netty4/Netty4HttpServerTransport.java | 22 +++++++++---------- .../http/AbstractHttpServerTransport.java | 1 - 3 files changed, 27 insertions(+), 12 deletions(-) create mode 100644 modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4ConditionalDecompressor.java diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4ConditionalDecompressor.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4ConditionalDecompressor.java new file mode 100644 index 0000000000000..57e08134ab16c --- /dev/null +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4ConditionalDecompressor.java @@ -0,0 +1,16 @@ +package org.opensearch.http.netty4; + +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http.HttpContentDecompressor; + +import static org.opensearch.http.netty4.Netty4HttpServerTransport.SHOULD_DECOMPRESS; + +public class Netty4ConditionalDecompressor extends HttpContentDecompressor { + @Override + protected EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception { + if (Boolean.FALSE.equals(ctx.channel().attr(SHOULD_DECOMPRESS).get())) { + return super.newContentDecoder("identity"); + } + return super.newContentDecoder(contentEncoding); + } +} diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java index 206f98017e8ec..d38cd1609efcf 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java @@ -80,7 +80,6 @@ import io.netty.channel.socket.nio.NioChannelOption; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.http.HttpContentCompressor; -import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; @@ -345,6 +344,7 @@ public ChannelHandler configureServerChannelHandler() { public static final AttributeKey CONTEXT_TO_RESTORE = AttributeKey.newInstance( "opensearch-http-request-thread-context" ); + public static final AttributeKey SHOULD_DECOMPRESS = AttributeKey.newInstance("opensearch-http-should-decompress"); protected static class HttpChannelHandler extends ChannelInitializer { @@ -427,10 +427,14 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E final ChannelPipeline pipeline = ctx.pipeline(); pipeline.addAfter(ctx.name(), "handler", getRequestHandler()); pipeline.replace(this, "header_verifier", transport.createHeaderVerifier()); - pipeline.addAfter("header_verifier", "decompress", transport.createDecompressor()); - pipeline.addAfter("decompress", "aggregator", aggregator); + pipeline.addAfter("header_verifier", "decoder_compress", new Netty4ConditionalDecompressor()); + pipeline.addAfter("decoder_compress", "aggregator", aggregator); if (handlingSettings.isCompression()) { - pipeline.addAfter("aggregator", "compress", new HttpContentCompressor(handlingSettings.getCompressionLevel())); + pipeline.addAfter( + "aggregator", + "encoder_compress", + new HttpContentCompressor(handlingSettings.getCompressionLevel()) + ); } pipeline.addBefore("handler", "request_creator", requestCreator); pipeline.addBefore("handler", "response_creator", responseCreator); @@ -450,13 +454,13 @@ protected void configureDefaultHttpPipeline(ChannelPipeline pipeline) { decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); pipeline.addLast("decoder", decoder); pipeline.addLast("header_verifier", transport.createHeaderVerifier()); - pipeline.addLast("decompress", transport.createDecompressor()); + pipeline.addLast("decoder_compress", new Netty4ConditionalDecompressor()); pipeline.addLast("encoder", new HttpResponseEncoder()); final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength()); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); pipeline.addLast("aggregator", aggregator); if (handlingSettings.isCompression()) { - pipeline.addLast("compress", new HttpContentCompressor(handlingSettings.getCompressionLevel())); + pipeline.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel())); } pipeline.addLast("request_creator", requestCreator); pipeline.addLast("response_creator", responseCreator); @@ -497,7 +501,7 @@ protected void initChannel(Channel childChannel) throws Exception { .addLast("byte_buf_sizer", byteBufSizer) .addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS)) .addLast("header_verifier", transport.createHeaderVerifier()) - .addLast("decompress", transport.createDecompressor()); + .addLast("decoder_compress", new Netty4ConditionalDecompressor()); if (handlingSettings.isCompression()) { childChannel.pipeline().addLast("compress", new HttpContentCompressor(handlingSettings.getCompressionLevel())); @@ -535,10 +539,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } } - protected HttpContentDecompressor createDecompressor() { - return new HttpContentDecompressor(); - } - protected ChannelInboundHandlerAdapter createHeaderVerifier() { // pass-through return new ChannelInboundHandlerAdapter(); diff --git a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java index 61c2fa01c9c06..d4c3a8c79c5db 100644 --- a/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java +++ b/server/src/main/java/org/opensearch/http/AbstractHttpServerTransport.java @@ -490,7 +490,6 @@ public static RestRequest createRestRequest( final HttpRequest httpRequest, final HttpChannel httpChannel ) { - // TODO Figure out how to only generate one request ID for each request in the pipeline. Exception badRequestCause = httpRequest.getInboundException(); /*