Skip to content

Commit

Permalink
Make ChunkedRestResponseBody extend Releasable (elastic#99871)
Browse files Browse the repository at this point in the history
If an unchunked body implements `Releasable` then it is released once
the response is sent. We have some need for the same behaviour with
chunked bodies, except that in this case there's no need for an
`instanceof` check since we control the body type directly. This commit
makes `ChunkedRestResponseBody extend Releasable` and adds support for
closing it when the response is sent.
  • Loading branch information
DaveCTurner authored Sep 25, 2023
1 parent c584741 commit 2458118
Show file tree
Hide file tree
Showing 7 changed files with 139 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,9 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return "application/octet-stream";
}

@Override
public void close() {}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void sendResponse(RestResponse restResponse) {
final HttpResponse httpResponse;
if (isHeadRequest == false && restResponse.isChunked()) {
ChunkedRestResponseBody chunkedContent = restResponse.chunkedContent();
toClose.add(chunkedContent);
if (httpLogger != null && httpLogger.isBodyTracerEnabled()) {
final var loggerStream = httpLogger.openResponseBodyLoggingStream(request.getRequestId());
toClose.add(() -> {
Expand All @@ -132,8 +133,10 @@ public void sendResponse(RestResponse restResponse) {
httpResponse = httpRequest.createResponse(restResponse.status(), chunkedContent);
} else {
final BytesReference content = restResponse.content();
if (content instanceof Releasable) {
toClose.add((Releasable) content);
if (content instanceof Releasable releasable) {
toClose.add(releasable);
} else if (restResponse.isChunked()) {
toClose.add(restResponse.chunkedContent());
}
toClose.add(this::releaseOutputBuffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.CheckedConsumer;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.Streams;
Expand All @@ -32,7 +34,7 @@
* The body of a rest response that uses chunked HTTP encoding. Implementations are used to avoid materializing full responses on heap and
* instead serialize only as much of the response as can be flushed to the network right away.
*/
public interface ChunkedRestResponseBody {
public interface ChunkedRestResponseBody extends Releasable {

/**
* @return true once this response has been written fully.
Expand Down Expand Up @@ -62,9 +64,29 @@ public interface ChunkedRestResponseBody {
* @param params parameters to use for serialization
* @param channel channel the response will be written to
* @return chunked rest response body
* @deprecated Use {@link #fromXContent(ChunkedToXContent, ToXContent.Params, RestChannel, Releasable)} instead.
*/
@Deprecated(forRemoval = true)
static ChunkedRestResponseBody fromXContent(ChunkedToXContent chunkedToXContent, ToXContent.Params params, RestChannel channel)
throws IOException {
return fromXContent(chunkedToXContent, params, channel, null);
}

/**
* Create a chunked response body to be written to a specific {@link RestChannel} from a {@link ChunkedToXContent}.
*
* @param chunkedToXContent chunked x-content instance to serialize
* @param params parameters to use for serialization
* @param channel channel the response will be written to
* @param releasable resource to release when the response is fully sent, or {@code null} if nothing to release
* @return chunked rest response body
*/
static ChunkedRestResponseBody fromXContent(
ChunkedToXContent chunkedToXContent,
ToXContent.Params params,
RestChannel channel,
@Nullable Releasable releasable
) throws IOException {

return new ChunkedRestResponseBody() {

Expand Down Expand Up @@ -132,14 +154,34 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return builder.getResponseContentTypeString();
}

@Override
public void close() {
Releasables.closeExpectNoException(releasable);
}
};
}

/**
* Create a chunked response body to be written to a specific {@link RestChannel} from a stream of text chunks, each represented as a
* consumer of a {@link Writer}. The last chunk that the iterator yields must write at least one byte.
*
* @deprecated Use {@link #fromTextChunks(String, Iterator, Releasable)} instead.
*/
@Deprecated(forRemoval = true)
static ChunkedRestResponseBody fromTextChunks(String contentType, Iterator<CheckedConsumer<Writer, IOException>> chunkIterator) {
return fromTextChunks(contentType, chunkIterator, null);
}

/**
* Create a chunked response body to be written to a specific {@link RestChannel} from a stream of text chunks, each represented as a
* consumer of a {@link Writer}. The last chunk that the iterator yields must write at least one byte.
*/
static ChunkedRestResponseBody fromTextChunks(
String contentType,
Iterator<CheckedConsumer<Writer, IOException>> chunkIterator,
@Nullable Releasable releasable
) {
return new ChunkedRestResponseBody() {
private RecyclerBytesStreamOutput currentOutput;
private final Writer writer = new OutputStreamWriter(new OutputStream() {
Expand Down Expand Up @@ -209,6 +251,11 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return contentType;
}

@Override
public void close() {
Releasables.closeExpectNoException(releasable);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return inner.getResponseContentTypeString();
}

@Override
public void close() {
inner.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContent;
Expand Down Expand Up @@ -81,7 +82,11 @@ public RestResponse(RestStatus status, String responseMediaType, BytesReference

public static RestResponse chunked(RestStatus restStatus, ChunkedRestResponseBody content) {
if (content.isDone()) {
return new RestResponse(restStatus, content.getResponseContentTypeString(), BytesArray.EMPTY);
return new RestResponse(
restStatus,
content.getResponseContentTypeString(),
new ReleasableBytesReference(BytesArray.EMPTY, content)
);
} else {
return new RestResponse(restStatus, content.getResponseContentTypeString(), null, content);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.test.ActionListenerUtils.anyActionListener;
Expand Down Expand Up @@ -525,6 +526,7 @@ public void testHandleHeadRequest() {
}
{
// chunked response
final var isClosed = new AtomicBoolean();
channel.sendResponse(RestResponse.chunked(RestStatus.OK, new ChunkedRestResponseBody() {

@Override
Expand All @@ -541,11 +543,28 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return RestResponse.TEXT_CONTENT_TYPE;
}

@Override
public void close() {
assertTrue(isClosed.compareAndSet(false, true));
}
}));
verify(httpChannel, times(2)).sendResponse(requestCaptor.capture(), any());
@SuppressWarnings("unchecked")
Class<ActionListener<Void>> listenerClass = (Class<ActionListener<Void>>) (Class<?>) ActionListener.class;
ArgumentCaptor<ActionListener<Void>> listenerCaptor = ArgumentCaptor.forClass(listenerClass);
verify(httpChannel, times(2)).sendResponse(requestCaptor.capture(), listenerCaptor.capture());
HttpResponse response = requestCaptor.getValue();
assertThat(response, instanceOf(TestHttpResponse.class));
assertThat(((TestHttpResponse) response).content().length(), equalTo(0));

ActionListener<Void> listener = listenerCaptor.getValue();
assertFalse(isClosed.get());
if (randomBoolean()) {
listener.onResponse(null);
} else {
listener.onFailure(new ClosedChannelException());
}
assertTrue(isClosed.get());
}
}

Expand Down Expand Up @@ -703,6 +722,7 @@ public HttpResponse createResponse(RestStatus status, ChunkedRestResponseBody co
)
);

final var isClosed = new AtomicBoolean();
assertEquals(
responseBody,
ChunkedLoggingStreamTests.getDecodedLoggedBody(
Expand Down Expand Up @@ -730,10 +750,16 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
public String getResponseContentTypeString() {
return RestResponse.TEXT_CONTENT_TYPE;
}

@Override
public void close() {
assertTrue(isClosed.compareAndSet(false, true));
}
}))
)
);

assertTrue(isClosed.get());
}

private TestHttpResponse executeRequest(final Settings settings, final String host) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

public class ChunkedRestResponseBodyTests extends ESTestCase {

Expand All @@ -50,40 +51,59 @@ public void testEncodesChunkedXContentCorrectly() throws IOException {
}
final var bytesDirect = BytesReference.bytes(builderDirect);

final var chunkedResponse = ChunkedRestResponseBody.fromXContent(
chunkedToXContent,
ToXContent.EMPTY_PARAMS,
new FakeRestChannel(
new FakeRestRequest.Builder(xContentRegistry()).withContent(BytesArray.EMPTY, randomXContent.type()).build(),
randomBoolean(),
1
final var isClosed = new AtomicBoolean();
try (
var chunkedResponse = ChunkedRestResponseBody.fromXContent(
chunkedToXContent,
ToXContent.EMPTY_PARAMS,
new FakeRestChannel(
new FakeRestRequest.Builder(xContentRegistry()).withContent(BytesArray.EMPTY, randomXContent.type()).build(),
randomBoolean(),
1
),
() -> assertTrue(isClosed.compareAndSet(false, true))
)
);
) {

final List<BytesReference> refsGenerated = new ArrayList<>();
while (chunkedResponse.isDone() == false) {
refsGenerated.add(chunkedResponse.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE));
}
final List<BytesReference> refsGenerated = new ArrayList<>();
while (chunkedResponse.isDone() == false) {
refsGenerated.add(chunkedResponse.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE));
}

assertEquals(bytesDirect, CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0])));
assertEquals(bytesDirect, CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0])));
assertFalse(isClosed.get());
}
assertTrue(isClosed.get());
}

public void testFromTextChunks() throws IOException {
final var chunks = randomList(1000, () -> randomUnicodeOfLengthBetween(1, 100));
final var body = ChunkedRestResponseBody.fromTextChunks("text/plain", Iterators.map(chunks.iterator(), s -> w -> w.write(s)));

final List<BytesReference> refsGenerated = new ArrayList<>();
while (body.isDone() == false) {
refsGenerated.add(body.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE));
}
final BytesReference chunkedBytes = CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0]));
final var isClosed = new AtomicBoolean();
try (
var body = ChunkedRestResponseBody.fromTextChunks(
"text/plain",
Iterators.map(chunks.iterator(), s -> w -> w.write(s)),
() -> assertTrue(isClosed.compareAndSet(false, true))
)
) {
final List<BytesReference> refsGenerated = new ArrayList<>();
while (body.isDone() == false) {
refsGenerated.add(body.encodeChunk(randomIntBetween(2, 10), BytesRefRecycler.NON_RECYCLING_INSTANCE));
}
final BytesReference chunkedBytes = CompositeBytesReference.of(refsGenerated.toArray(new BytesReference[0]));

try (var outputStream = new ByteArrayOutputStream(); var writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)) {
for (final var chunk : chunks) {
writer.write(chunk);
try (
var outputStream = new ByteArrayOutputStream();
var writer = new OutputStreamWriter(outputStream, StandardCharsets.UTF_8)
) {
for (final var chunk : chunks) {
writer.write(chunk);
}
writer.flush();
assertEquals(new BytesArray(outputStream.toByteArray()), chunkedBytes);
}
writer.flush();
assertEquals(new BytesArray(outputStream.toByteArray()), chunkedBytes);
assertFalse(isClosed.get());
}
assertTrue(isClosed.get());
}
}

0 comments on commit 2458118

Please sign in to comment.