From 02e69f8fdb42b25e42bb1452c45c185188c8649d Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Wed, 28 Aug 2024 17:01:01 -0400 Subject: [PATCH] Fix Netty's ByteBuf leak (#15475) Signed-off-by: Andriy Redko --- .../netty4/ReactorNetty4HttpChunk.java | 26 +++++++------------ .../ReactorNetty4StreamingHttpChannel.java | 1 + ...ReactorNetty4StreamingRequestConsumer.java | 2 +- 3 files changed, 11 insertions(+), 18 deletions(-) diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java index 3b4a308691e7b..1c4c1fd3e49e3 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java @@ -8,39 +8,31 @@ package org.opensearch.http.reactor.netty4; +import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.http.HttpChunk; -import org.opensearch.transport.reactor.netty4.Netty4Utils; - -import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; class ReactorNetty4HttpChunk implements HttpChunk { - private final AtomicBoolean released; - private final boolean pooled; - private final ByteBuf content; + private final BytesArray content; private final boolean last; - ReactorNetty4HttpChunk(ByteBuf content, boolean last) { - this.content = content; - this.pooled = true; - this.released = new AtomicBoolean(false); + ReactorNetty4HttpChunk(ByteBuf buf, boolean last) { + // Since the chunks could be batched and processing could be delayed, we are copying the content here + final byte[] content = new byte[buf.readableBytes()]; + buf.readBytes(content); + this.content = new BytesArray(content); this.last = last; } @Override public BytesReference content() { - assert released.get() == false; - return Netty4Utils.toBytesReference(content); + return content; } @Override - public void close() { - if (pooled && released.compareAndSet(false, true)) { - content.release(); - } - } + public void close() {} @Override public boolean isLast() { diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java index 12ed847c0c0de..1aa03aa9967e2 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java @@ -103,6 +103,7 @@ public void receiveChunk(HttpChunk message) { } } catch (final Exception ex) { producer.error(ex); + } finally { message.close(); } } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java index 282a82dc39fda..0559f89478838 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java @@ -44,7 +44,7 @@ public void subscribe(Subscriber s) { } HttpChunk createChunk(HttpContent chunk, boolean last) { - return new ReactorNetty4HttpChunk(chunk.copy().content().retain(), last); + return new ReactorNetty4HttpChunk(chunk.content(), last); } StreamingHttpChannel httpChannel() {