From cd9e72f8635eb7aa1937c39a57cf782f39754bc0 Mon Sep 17 00:00:00 2001 From: Craig Perkins Date: Wed, 27 Sep 2023 22:01:05 -0400 Subject: [PATCH] Create new instance of each inbound handler Signed-off-by: Craig Perkins --- .../netty4/Netty4HttpServerTransport.java | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java index 6a3db18086a49..88ecaafad2597 100644 --- a/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java +++ b/modules/transport-netty4/src/main/java/org/opensearch/http/netty4/Netty4HttpServerTransport.java @@ -186,9 +186,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport { private volatile ServerBootstrap serverBootstrap; private volatile SharedGroupFactory.SharedGroup sharedGroup; - private static final HttpContentDecompressor DEFAULT_DECOMPRESSOR = new HttpContentDecompressor(); - private static final ChannelInboundHandlerAdapter DEFAULT_HEADER_VERIFIER = new ChannelInboundHandlerAdapter(); - public Netty4HttpServerTransport( Settings settings, NetworkService networkService, @@ -429,8 +426,8 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpMessage msg) throws E // If this handler is hit then no upgrade has been attempted and the client is just talking HTTP final ChannelPipeline pipeline = ctx.pipeline(); pipeline.addAfter(ctx.name(), "handler", getRequestHandler()); - pipeline.replace(this, "header_verifier", transport.getHeaderVerifier()); - pipeline.addAfter("header_verifier", "decompress", transport.getDecompressor()); + pipeline.replace(this, "header_verifier", transport.creategetHeaderVerifier()); + pipeline.addAfter("header_verifier", "decompress", transport.createDecompressor()); pipeline.addAfter("decompress", "aggregator", aggregator); if (handlingSettings.isCompression()) { pipeline.addAfter("aggregator", "compress", new HttpContentCompressor(handlingSettings.getCompressionLevel())); @@ -452,8 +449,8 @@ protected void configureDefaultHttpPipeline(ChannelPipeline pipeline) { ); decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR); pipeline.addLast("decoder", decoder); - pipeline.addLast("header_verifier", transport.getHeaderVerifier()); - pipeline.addLast("decompress", transport.getDecompressor()); + pipeline.addLast("header_verifier", transport.creategetHeaderVerifier()); + pipeline.addLast("decompress", transport.createDecompressor()); pipeline.addLast("encoder", new HttpResponseEncoder()); final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength()); aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents); @@ -499,8 +496,8 @@ protected void initChannel(Channel childChannel) throws Exception { .addLast(new Http2StreamFrameToHttpObjectCodec(true)) .addLast("byte_buf_sizer", byteBufSizer) .addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS)) - .addLast("header_verifier", transport.getHeaderVerifier()) - .addLast("decompress", transport.getDecompressor()); + .addLast("header_verifier", transport.creategetHeaderVerifier()) + .addLast("decompress", transport.createDecompressor()); if (handlingSettings.isCompression()) { childChannel.pipeline().addLast("compress", new HttpContentCompressor(handlingSettings.getCompressionLevel())); @@ -538,12 +535,12 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { } } - protected HttpContentDecompressor getDecompressor() { - return DEFAULT_DECOMPRESSOR; + protected HttpContentDecompressor createDecompressor() { + return new HttpContentDecompressor(); } - protected ChannelInboundHandlerAdapter getHeaderVerifier() { + protected ChannelInboundHandlerAdapter creategetHeaderVerifier() { // pass-through - return DEFAULT_HEADER_VERIFIER; + return new ChannelInboundHandlerAdapter(); } }