From 20becc553afe64410130eed039b79790328d9f4c Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Sat, 22 Nov 2014 10:59:44 -0500 Subject: [PATCH 1/5] Remove unnecessary calls to CodecPool.returnCompressor/returnDecompressor to avoid race conditions The input/output stream implementations erroneously add the (de)compressors back to the CodecPool on close. The user who creates the (de)compressor is responsibile for doing this, and if they return a decompressor as well, you will have the same instance in the pool twice. --- randdata | 0 src/main/java/com/hadoop/compression/lzo/LzopInputStream.java | 3 --- src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java | 3 --- 3 files changed, 6 deletions(-) create mode 100644 randdata diff --git a/randdata b/randdata new file mode 100644 index 00000000..e69de29b diff --git a/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java b/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java index 20e604c7..d7021dc1 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java +++ b/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java @@ -346,9 +346,6 @@ public void close() throws IOException { // LZO requires that each file ends with 4 trailing zeroes. If we are here, // the file didn't. It's not critical, though, so log and eat it in this case. LOG.warn("Incorrect LZO file format: file did not end with four trailing zeroes.", e); - } finally{ - //return the decompressor to the pool, the function itself handles null. - CodecPool.returnDecompressor(decompressor); } } } diff --git a/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java b/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java index 88756529..dd106b99 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java +++ b/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java @@ -114,9 +114,6 @@ public void close() throws IOException { indexOut.close(); } closed = true; - //return the compressor to the pool for later reuse; - //the returnCompressor handles nulls. - CodecPool.returnCompressor(compressor); } } From efdf80cdb68969df8dab5bde8779c49d589ac435 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Wed, 3 Dec 2014 18:35:03 -0500 Subject: [PATCH 2/5] Ensure users who don't supply a (de)compressor don't leak (de)compressors --- randdata | 0 .../com/hadoop/compression/lzo/LzopCodec.java | 60 +++++++++++++++++-- 2 files changed, 55 insertions(+), 5 deletions(-) delete mode 100644 randdata diff --git a/randdata b/randdata deleted file mode 100644 index e69de29b..00000000 diff --git a/src/main/java/com/hadoop/compression/lzo/LzopCodec.java b/src/main/java/com/hadoop/compression/lzo/LzopCodec.java index dbc752c2..847a6625 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzopCodec.java +++ b/src/main/java/com/hadoop/compression/lzo/LzopCodec.java @@ -23,6 +23,7 @@ import java.io.InputStream; import java.io.OutputStream; +import com.javafx.tools.doclets.internal.toolkit.util.DocFinder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -67,7 +68,9 @@ public class LzopCodec extends LzoCodec { public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { //get a compressor which will be returned to the pool when the output stream //is closed. - return createOutputStream(out, getCompressor()); + Compressor compressor = getCompressor(); + OutputStream wrapped = new WrappedOutputStream(out, compressor); + return createOutputStream(wrapped, compressor); } public CompressionOutputStream createIndexedOutputStream(OutputStream out, @@ -75,7 +78,9 @@ public CompressionOutputStream createIndexedOutputStream(OutputStream out, throws IOException { //get a compressor which will be returned to the pool when the output stream //is closed. - return createIndexedOutputStream(out, indexOut, getCompressor()); + Compressor compressor = getCompressor(); + OutputStream wrapped = new WrappedOutputStream(out, compressor); + return createIndexedOutputStream(wrapped, indexOut, compressor); } @Override @@ -106,11 +111,56 @@ public CompressionInputStream createInputStream(InputStream in, getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE)); } + private static class WrappedInputStream extends InputStream { + private InputStream inputStream; + private Decompressor decompressor; + + public WrappedInputStream(InputStream inputStream, Decompressor decompressor) { + this.inputStream = inputStream; + this.decompressor = decompressor; + } + + @Override + public int read() throws IOException { + return inputStream.read(); + } + + @Override + public void close() throws IOException { + CodecPool.returnDecompressor(decompressor); + inputStream.close(); + } + } + + // Previous versions of the API accidentally added/removed compressor/decompressors from the pool + // when they shouldn't. These classes are kind of a hack to maintain existing behavior, + // while still allowing proper resource management from outside + private static class WrappedOutputStream extends OutputStream { + OutputStream outputStream; + Compressor compressor; + + public WrappedOutputStream(OutputStream outputStream, Compressor compressor) { + this.outputStream = outputStream; + this.compressor = compressor; + } + + @Override + public void write(int b) throws IOException { + outputStream.write(b); + } + + @Override + public void close() throws IOException { + CodecPool.returnCompressor(compressor); + outputStream.close(); + } + } + @Override public CompressionInputStream createInputStream(InputStream in) throws IOException { - // get a decompressor from a pool which will be returned to the pool - // when LzoInputStream is closed - return createInputStream(in, CodecPool.getDecompressor(this)); + Decompressor decompressor = CodecPool.getDecompressor(this); + InputStream inputStream = new WrappedInputStream(in, decompressor); + return createInputStream(inputStream, decompressor); } @Override From cc2c7cd46e6198eb68b0950fbb81ff5a8c063158 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Wed, 3 Dec 2014 18:39:39 -0500 Subject: [PATCH 3/5] Fixup bogus imports --- src/main/java/com/hadoop/compression/lzo/LzopCodec.java | 1 - src/main/java/com/hadoop/compression/lzo/LzopInputStream.java | 1 - src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java | 1 - 3 files changed, 3 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/LzopCodec.java b/src/main/java/com/hadoop/compression/lzo/LzopCodec.java index 847a6625..fbeb09bc 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzopCodec.java +++ b/src/main/java/com/hadoop/compression/lzo/LzopCodec.java @@ -23,7 +23,6 @@ import java.io.InputStream; import java.io.OutputStream; -import com.javafx.tools.doclets.internal.toolkit.util.DocFinder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; diff --git a/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java b/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java index d7021dc1..53efd8ed 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java +++ b/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java @@ -31,7 +31,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.compress.BlockDecompressorStream; -import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.Decompressor; public class LzopInputStream extends BlockDecompressorStream { diff --git a/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java b/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java index dd106b99..eff4063c 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java +++ b/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java @@ -25,7 +25,6 @@ import java.util.zip.Adler32; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressorStream; import org.apache.hadoop.io.compress.Compressor; From 9ae0ca5170a6a4355143f52e04f10cdb04e1f9a4 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Thu, 4 Dec 2014 12:03:53 -0500 Subject: [PATCH 4/5] Incorporate @rangadi's feedback --- .../com/hadoop/compression/lzo/LzopCodec.java | 54 +++++++------------ 1 file changed, 18 insertions(+), 36 deletions(-) diff --git a/src/main/java/com/hadoop/compression/lzo/LzopCodec.java b/src/main/java/com/hadoop/compression/lzo/LzopCodec.java index fbeb09bc..15234323 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzopCodec.java +++ b/src/main/java/com/hadoop/compression/lzo/LzopCodec.java @@ -18,10 +18,7 @@ package com.hadoop.compression.lzo; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CodecPool; @@ -110,55 +107,40 @@ public CompressionInputStream createInputStream(InputStream in, getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE)); } - private static class WrappedInputStream extends InputStream { - private InputStream inputStream; - private Decompressor decompressor; - - public WrappedInputStream(InputStream inputStream, Decompressor decompressor) { - this.inputStream = inputStream; - this.decompressor = decompressor; - } - - @Override - public int read() throws IOException { - return inputStream.read(); - } - - @Override - public void close() throws IOException { - CodecPool.returnDecompressor(decompressor); - inputStream.close(); - } - } - // Previous versions of the API accidentally added/removed compressor/decompressors from the pool - // when they shouldn't. These classes are kind of a hack to maintain existing behavior, + // when they shouldn't. This classs is kind of a hack to maintain existing behavior, // while still allowing proper resource management from outside - private static class WrappedOutputStream extends OutputStream { - OutputStream outputStream; - Compressor compressor; + private static class WrappedOutputStream extends FilterOutputStream { + private Compressor compressor; public WrappedOutputStream(OutputStream outputStream, Compressor compressor) { - this.outputStream = outputStream; + super(outputStream); this.compressor = compressor; } @Override - public void write(int b) throws IOException { - outputStream.write(b); + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); } @Override public void close() throws IOException { CodecPool.returnCompressor(compressor); - outputStream.close(); + super.close(); } } @Override - public CompressionInputStream createInputStream(InputStream in) throws IOException { - Decompressor decompressor = CodecPool.getDecompressor(this); - InputStream inputStream = new WrappedInputStream(in, decompressor); + public CompressionInputStream createInputStream(final InputStream in) throws IOException { + final Decompressor decompressor = CodecPool.getDecompressor(this); + // maintain backwards compatibility re: returning the decompressor to the CodecPool + InputStream inputStream = new FilterInputStream(in) { + @Override + public void close() throws IOException { + CodecPool.returnDecompressor(decompressor); + super.close(); + } + }; return createInputStream(inputStream, decompressor); } From 72d71d15de6bdb396a2dc3f330bff63bc00941c9 Mon Sep 17 00:00:00 2001 From: Ian Hummel Date: Thu, 4 Dec 2014 12:21:44 -0500 Subject: [PATCH 5/5] Fix up imports --- src/main/java/com/hadoop/compression/lzo/LzopCodec.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/hadoop/compression/lzo/LzopCodec.java b/src/main/java/com/hadoop/compression/lzo/LzopCodec.java index 15234323..21a181e9 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzopCodec.java +++ b/src/main/java/com/hadoop/compression/lzo/LzopCodec.java @@ -18,7 +18,12 @@ package com.hadoop.compression.lzo; -import java.io.*; +import java.io.DataOutputStream; +import java.io.FilterInputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CodecPool;