Skip to content

Commit

Permalink
Remove createDecompressor extension point in favor of attributeKey th…
Browse files Browse the repository at this point in the history
…at can be populated by an extending transport

Signed-off-by: Craig Perkins <[email protected]>
  • Loading branch information
cwperks committed Sep 29, 2023
1 parent 2aba797 commit d1d48b1
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package org.opensearch.http.netty4;

import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http.HttpContentDecompressor;

import static org.opensearch.http.netty4.Netty4HttpServerTransport.SHOULD_DECOMPRESS;

public class Netty4ConditionalDecompressor extends HttpContentDecompressor {
@Override
protected EmbeddedChannel newContentDecoder(String contentEncoding) throws Exception {
if (Boolean.FALSE.equals(ctx.channel().attr(SHOULD_DECOMPRESS).get())) {
return super.newContentDecoder("identity");
}
return super.newContentDecoder(contentEncoding);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
Expand Down Expand Up @@ -345,6 +344,7 @@ public ChannelHandler configureServerChannelHandler() {
public static final AttributeKey<ThreadContext.StoredContext> CONTEXT_TO_RESTORE = AttributeKey.newInstance(
"opensearch-http-request-thread-context"
);
public static final AttributeKey<Boolean> SHOULD_DECOMPRESS = AttributeKey.newInstance("opensearch-http-should-decompress");

protected static class HttpChannelHandler extends ChannelInitializer<Channel> {

Expand Down Expand Up @@ -427,10 +427,14 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E
final ChannelPipeline pipeline = ctx.pipeline();
pipeline.addAfter(ctx.name(), "handler", getRequestHandler());
pipeline.replace(this, "header_verifier", transport.createHeaderVerifier());
pipeline.addAfter("header_verifier", "decompress", transport.createDecompressor());
pipeline.addAfter("decompress", "aggregator", aggregator);
pipeline.addAfter("header_verifier", "decoder_compress", new Netty4ConditionalDecompressor());
pipeline.addAfter("decoder_compress", "aggregator", aggregator);
if (handlingSettings.isCompression()) {
pipeline.addAfter("aggregator", "compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
pipeline.addAfter(
"aggregator",
"encoder_compress",
new HttpContentCompressor(handlingSettings.getCompressionLevel())
);
}
pipeline.addBefore("handler", "request_creator", requestCreator);
pipeline.addBefore("handler", "response_creator", responseCreator);
Expand All @@ -450,13 +454,13 @@ protected void configureDefaultHttpPipeline(ChannelPipeline pipeline) {
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
pipeline.addLast("decoder", decoder);
pipeline.addLast("header_verifier", transport.createHeaderVerifier());
pipeline.addLast("decompress", transport.createDecompressor());
pipeline.addLast("decoder_compress", new Netty4ConditionalDecompressor());
pipeline.addLast("encoder", new HttpResponseEncoder());
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
pipeline.addLast("aggregator", aggregator);
if (handlingSettings.isCompression()) {
pipeline.addLast("compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
pipeline.addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
pipeline.addLast("request_creator", requestCreator);
pipeline.addLast("response_creator", responseCreator);
Expand Down Expand Up @@ -497,7 +501,7 @@ protected void initChannel(Channel childChannel) throws Exception {
.addLast("byte_buf_sizer", byteBufSizer)
.addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS))
.addLast("header_verifier", transport.createHeaderVerifier())
.addLast("decompress", transport.createDecompressor());
.addLast("decoder_compress", new Netty4ConditionalDecompressor());

if (handlingSettings.isCompression()) {
childChannel.pipeline().addLast("compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
Expand Down Expand Up @@ -535,10 +539,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
}
}

protected HttpContentDecompressor createDecompressor() {
return new HttpContentDecompressor();
}

protected ChannelInboundHandlerAdapter createHeaderVerifier() {
// pass-through
return new ChannelInboundHandlerAdapter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,6 @@ public static RestRequest createRestRequest(
final HttpRequest httpRequest,
final HttpChannel httpChannel
) {
// TODO Figure out how to only generate one request ID for each request in the pipeline.
Exception badRequestCause = httpRequest.getInboundException();

/*
Expand Down

0 comments on commit d1d48b1

Please sign in to comment.