diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java index 3b4a308691e7b..1c4c1fd3e49e3 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4HttpChunk.java @@ -8,39 +8,31 @@ package org.opensearch.http.reactor.netty4; +import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.http.HttpChunk; -import org.opensearch.transport.reactor.netty4.Netty4Utils; - -import java.util.concurrent.atomic.AtomicBoolean; import io.netty.buffer.ByteBuf; class ReactorNetty4HttpChunk implements HttpChunk { - private final AtomicBoolean released; - private final boolean pooled; - private final ByteBuf content; + private final BytesArray content; private final boolean last; - ReactorNetty4HttpChunk(ByteBuf content, boolean last) { - this.content = content; - this.pooled = true; - this.released = new AtomicBoolean(false); + ReactorNetty4HttpChunk(ByteBuf buf, boolean last) { + // Since the chunks could be batched and processing could be delayed, we are copying the content here + final byte[] content = new byte[buf.readableBytes()]; + buf.readBytes(content); + this.content = new BytesArray(content); this.last = last; } @Override public BytesReference content() { - assert released.get() == false; - return Netty4Utils.toBytesReference(content); + return content; } @Override - public void close() { - if (pooled && released.compareAndSet(false, true)) { - content.release(); - } - } + public void close() {} @Override public boolean isLast() { diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java index 12ed847c0c0de..1aa03aa9967e2 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingHttpChannel.java @@ -103,6 +103,7 @@ public void receiveChunk(HttpChunk message) { } } catch (final Exception ex) { producer.error(ex); + } finally { message.close(); } } diff --git a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java index 282a82dc39fda..0559f89478838 100644 --- a/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java +++ b/plugins/transport-reactor-netty4/src/main/java/org/opensearch/http/reactor/netty4/ReactorNetty4StreamingRequestConsumer.java @@ -44,7 +44,7 @@ public void subscribe(Subscriber s) { } HttpChunk createChunk(HttpContent chunk, boolean last) { - return new ReactorNetty4HttpChunk(chunk.copy().content().retain(), last); + return new ReactorNetty4HttpChunk(chunk.content(), last); } StreamingHttpChannel httpChannel() {