Skip to content

Commit

Permalink
Closing a ReleasableBytesStreamOutput closes the underlying BigArray (e…
Browse files Browse the repository at this point in the history
…lastic#23941)

This commit makes closing a ReleasableBytesStreamOutput release the underlying BigArray so
that we can use try-with-resources with these streams and avoid leaking memory by not returning
the BigArray. As part of this change, the ReleasableBytesStreamOutput adds protection to only
release the BigArray once.

In order to make some of the changes cleaner, the ReleasableBytesStream interface has been
removed. The BytesStream interface is changed to a abstract class so that we can use it as a
useable return type for a new method, Streams#flushOnCloseStream. This new method wraps a
given stream and overrides the close method so that the stream is simply flushed and not closed.
This behavior is used in the TcpTransport when compression is used with a
ReleasableBytesStreamOutput as we need to close the compressed stream to ensure all of the data
is written from this stream. Closing the compressed stream will try to close the underlying stream
but we only want to flush so that all of the written bytes are available.

Additionally, an error message method added in the BytesRestResponse did not use a builder
provided by the channel and instead created its own JSON builder. This changes that method to use
the channel builder and in turn the bytes stream output that is managed by the channel.

Note, this commit differs from 6bfecdf in that it updates
ReleasableBytesStreamOutput to handle the case of the BigArray decreasing in size, which changes
the reference to the BigArray. When the reference is changed, the releasable needs to be updated
otherwise there could be a leak of bytes and corruption of data in unrelated streams.

This reverts commit afd45c1, which reverted elastic#23572.
  • Loading branch information
jaymode authored Apr 14, 2017
1 parent e3aa2a8 commit 30ab873
Show file tree
Hide file tree
Showing 17 changed files with 274 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@
*/
public final class ReleasablePagedBytesReference extends PagedBytesReference implements Releasable {

public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length) {
private final Releasable releasable;

public ReleasablePagedBytesReference(BigArrays bigarrays, ByteArray byteArray, int length,
Releasable releasable) {
super(bigarrays, byteArray, length);
this.releasable = releasable;
}

@Override
public void close() {
Releasables.close(byteArray);
Releasables.close(releasable);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.common.compress;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

Expand All @@ -31,5 +32,9 @@ public interface Compressor {

StreamInput streamInput(StreamInput in) throws IOException;

/**
* Creates a new stream output that compresses the contents and writes to the provided stream
* output. Closing the returned {@link StreamOutput} will close the provided stream output.
*/
StreamOutput streamOutput(StreamOutput out) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.common.compress;

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -47,7 +46,7 @@ public class DeflateCompressor implements Compressor {
// It needs to be different from other compressors and to not be specific
// enough so that no stream starting with these bytes could be detected as
// a XContent
private static final byte[] HEADER = new byte[] { 'D', 'F', 'L', '\0' };
private static final byte[] HEADER = new byte[]{'D', 'F', 'L', '\0'};
// 3 is a good trade-off between speed and compression ratio
private static final int LEVEL = 3;
// We use buffering on the input and output of in/def-laters in order to
Expand Down Expand Up @@ -88,6 +87,7 @@ public StreamInput streamInput(StreamInput in) throws IOException {
decompressedIn = new BufferedInputStream(decompressedIn, BUFFER_SIZE);
return new InputStreamStreamInput(decompressedIn) {
final AtomicBoolean closed = new AtomicBoolean(false);

public void close() throws IOException {
try {
super.close();
Expand All @@ -107,10 +107,11 @@ public StreamOutput streamOutput(StreamOutput out) throws IOException {
final boolean nowrap = true;
final Deflater deflater = new Deflater(LEVEL, nowrap);
final boolean syncFlush = true;
OutputStream compressedOut = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
compressedOut = new BufferedOutputStream(compressedOut, BUFFER_SIZE);
DeflaterOutputStream deflaterOutputStream = new DeflaterOutputStream(out, deflater, BUFFER_SIZE, syncFlush);
OutputStream compressedOut = new BufferedOutputStream(deflaterOutputStream, BUFFER_SIZE);
return new OutputStreamStreamOutput(compressedOut) {
final AtomicBoolean closed = new AtomicBoolean(false);

public void close() throws IOException {
try {
super.close();
Expand Down

This file was deleted.

55 changes: 55 additions & 0 deletions core/src/main/java/org/elasticsearch/common/io/Streams.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
package org.elasticsearch.common.io;

import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.Callback;

import java.io.BufferedReader;
Expand Down Expand Up @@ -236,4 +239,56 @@ public static void readAllLines(InputStream input, Callback<String> callback) th
}
}
}

/**
* Wraps the given {@link BytesStream} in a {@link StreamOutput} that simply flushes when
* close is called.
*/
public static BytesStream flushOnCloseStream(BytesStream os) {
return new FlushOnCloseOutputStream(os);
}

/**
* A wrapper around a {@link BytesStream} that makes the close operation a flush. This is
* needed as sometimes a stream will be closed but the bytes that the stream holds still need
* to be used and the stream cannot be closed until the bytes have been consumed.
*/
private static class FlushOnCloseOutputStream extends BytesStream {

private final BytesStream delegate;

private FlushOnCloseOutputStream(BytesStream bytesStreamOutput) {
this.delegate = bytesStreamOutput;
}

@Override
public void writeByte(byte b) throws IOException {
delegate.writeByte(b);
}

@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
delegate.writeBytes(b, offset, length);
}

@Override
public void flush() throws IOException {
delegate.flush();
}

@Override
public void close() throws IOException {
flush();
}

@Override
public void reset() throws IOException {
delegate.reset();
}

@Override
public BytesReference bytes() {
return delegate.bytes();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
* under the License.
*/

package org.elasticsearch.common.io;
package org.elasticsearch.common.io.stream;

import org.elasticsearch.common.bytes.BytesReference;

public interface BytesStream {
public abstract class BytesStream extends StreamOutput {

BytesReference bytes();
}
public abstract BytesReference bytes();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.PagedBytesReference;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;

Expand All @@ -31,7 +30,7 @@
* A @link {@link StreamOutput} that uses {@link BigArrays} to acquire pages of
* bytes, which avoids frequent reallocation &amp; copying of the internal data.
*/
public class BytesStreamOutput extends StreamOutput implements BytesStream {
public class BytesStreamOutput extends BytesStream {

protected final BigArrays bigArrays;

Expand All @@ -50,7 +49,7 @@ public BytesStreamOutput() {
/**
* Create a non recycling {@link BytesStreamOutput} with enough initial pages acquired
* to satisfy the capacity given by expected size.
*
*
* @param expectedSize the expected maximum size of the stream in bytes.
*/
public BytesStreamOutput(int expectedSize) {
Expand Down Expand Up @@ -129,7 +128,7 @@ public void close() {

/**
* Returns the current size of the buffer.
*
*
* @return the value of the <code>count</code> field, which is the number of valid
* bytes in this output stream.
* @see java.io.ByteArrayOutputStream#count
Expand All @@ -151,7 +150,7 @@ public long ramBytesUsed() {
return bytes.ramBytesUsed();
}

private void ensureCapacity(long offset) {
void ensureCapacity(long offset) {
if (offset > Integer.MAX_VALUE) {
throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,66 @@
package org.elasticsearch.common.io.stream;

import org.elasticsearch.common.bytes.ReleasablePagedBytesReference;
import org.elasticsearch.common.io.ReleasableBytesStream;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;

/**
* An bytes stream output that allows providing a {@link BigArrays} instance
* expecting it to require releasing its content ({@link #bytes()}) once done.
* <p>
* Please note, its is the responsibility of the caller to make sure the bytes
* reference do not "escape" and are released only once.
* Please note, closing this stream will release the bytes that are in use by any
* {@link ReleasablePagedBytesReference} returned from {@link #bytes()}, so this
* stream should only be closed after the bytes have been output or copied
* elsewhere.
*/
public class ReleasableBytesStreamOutput extends BytesStreamOutput implements ReleasableBytesStream {
public class ReleasableBytesStreamOutput extends BytesStreamOutput
implements Releasable {

private Releasable releasable;

public ReleasableBytesStreamOutput(BigArrays bigarrays) {
super(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
this(BigArrays.PAGE_SIZE_IN_BYTES, bigarrays);
}

public ReleasableBytesStreamOutput(int expectedSize, BigArrays bigArrays) {
super(expectedSize, bigArrays);
this.releasable = Releasables.releaseOnce(this.bytes);
}

/**
* Returns a {@link Releasable} implementation of a
* {@link org.elasticsearch.common.bytes.BytesReference} that represents the current state of
* the bytes in the stream.
*/
@Override
public ReleasablePagedBytesReference bytes() {
return new ReleasablePagedBytesReference(bigArrays, bytes, count);
return new ReleasablePagedBytesReference(bigArrays, bytes, count, releasable);
}

@Override
public void close() {
Releasables.close(releasable);
}

@Override
void ensureCapacity(long offset) {
final ByteArray prevBytes = this.bytes;
super.ensureCapacity(offset);
if (prevBytes != this.bytes) {
// re-create the releasable with the new reference
releasable = Releasables.releaseOnce(this.bytes);
}
}

@Override
public void reset() {
final ByteArray prevBytes = this.bytes;
super.reset();
if (prevBytes != this.bytes) {
// re-create the releasable with the new reference
releasable = Releasables.releaseOnce(this.bytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.BytesStream;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.text.Text;
Expand Down Expand Up @@ -53,7 +53,7 @@
/**
* A utility to build XContent (ie json).
*/
public final class XContentBuilder implements BytesStream, Releasable, Flushable {
public final class XContentBuilder implements Releasable, Flushable {

/**
* Create a new {@link XContentBuilder} using the given {@link XContent} content.
Expand Down Expand Up @@ -1041,7 +1041,6 @@ public XContentGenerator generator() {
return this.generator;
}

@Override
public BytesReference bytes() {
close();
return ((BytesStream) bos).bytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public Location add(final Operation operation) throws IOException {
}
throw new TranslogException(shardId, "Failed to write operation [" + operation + "]", e);
} finally {
Releasables.close(out.bytes());
Releasables.close(out);
}
}

Expand Down Expand Up @@ -1332,7 +1332,7 @@ public static void writeOperations(StreamOutput outStream, List<Operation> toWri
bytes.writeTo(outStream);
}
} finally {
Releasables.close(out.bytes());
Releasables.close(out);
}

}
Expand Down
Loading

0 comments on commit 30ab873

Please sign in to comment.