diff --git a/src/main/java/com/hadoop/compression/lzo/LzopCodec.java b/src/main/java/com/hadoop/compression/lzo/LzopCodec.java index dbc752c2..21a181e9 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzopCodec.java +++ b/src/main/java/com/hadoop/compression/lzo/LzopCodec.java @@ -19,6 +19,8 @@ package com.hadoop.compression.lzo; import java.io.DataOutputStream; +import java.io.FilterInputStream; +import java.io.FilterOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -67,7 +69,9 @@ public class LzopCodec extends LzoCodec { public CompressionOutputStream createOutputStream(OutputStream out) throws IOException { //get a compressor which will be returned to the pool when the output stream //is closed. - return createOutputStream(out, getCompressor()); + Compressor compressor = getCompressor(); + OutputStream wrapped = new WrappedOutputStream(out, compressor); + return createOutputStream(wrapped, compressor); } public CompressionOutputStream createIndexedOutputStream(OutputStream out, @@ -75,7 +79,9 @@ public CompressionOutputStream createIndexedOutputStream(OutputStream out, throws IOException { //get a compressor which will be returned to the pool when the output stream //is closed. - return createIndexedOutputStream(out, indexOut, getCompressor()); + Compressor compressor = getCompressor(); + OutputStream wrapped = new WrappedOutputStream(out, compressor); + return createIndexedOutputStream(wrapped, indexOut, compressor); } @Override @@ -106,11 +112,41 @@ public CompressionInputStream createInputStream(InputStream in, getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE)); } + // Previous versions of the API accidentally added/removed compressor/decompressors from the pool + // when they shouldn't. This classs is kind of a hack to maintain existing behavior, + // while still allowing proper resource management from outside + private static class WrappedOutputStream extends FilterOutputStream { + private Compressor compressor; + + public WrappedOutputStream(OutputStream outputStream, Compressor compressor) { + super(outputStream); + this.compressor = compressor; + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void close() throws IOException { + CodecPool.returnCompressor(compressor); + super.close(); + } + } + @Override - public CompressionInputStream createInputStream(InputStream in) throws IOException { - // get a decompressor from a pool which will be returned to the pool - // when LzoInputStream is closed - return createInputStream(in, CodecPool.getDecompressor(this)); + public CompressionInputStream createInputStream(final InputStream in) throws IOException { + final Decompressor decompressor = CodecPool.getDecompressor(this); + // maintain backwards compatibility re: returning the decompressor to the CodecPool + InputStream inputStream = new FilterInputStream(in) { + @Override + public void close() throws IOException { + CodecPool.returnDecompressor(decompressor); + super.close(); + } + }; + return createInputStream(inputStream, decompressor); } @Override diff --git a/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java b/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java index 20e604c7..53efd8ed 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java +++ b/src/main/java/com/hadoop/compression/lzo/LzopInputStream.java @@ -31,7 +31,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.compress.BlockDecompressorStream; -import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.Decompressor; public class LzopInputStream extends BlockDecompressorStream { @@ -346,9 +345,6 @@ public void close() throws IOException { // LZO requires that each file ends with 4 trailing zeroes. If we are here, // the file didn't. It's not critical, though, so log and eat it in this case. LOG.warn("Incorrect LZO file format: file did not end with four trailing zeroes.", e); - } finally{ - //return the decompressor to the pool, the function itself handles null. - CodecPool.returnDecompressor(decompressor); } } } diff --git a/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java b/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java index 88756529..eff4063c 100644 --- a/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java +++ b/src/main/java/com/hadoop/compression/lzo/LzopOutputStream.java @@ -25,7 +25,6 @@ import java.util.zip.Adler32; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressorStream; import org.apache.hadoop.io.compress.Compressor; @@ -114,9 +113,6 @@ public void close() throws IOException { indexOut.close(); } closed = true; - //return the compressor to the pool for later reuse; - //the returnCompressor handles nulls. - CodecPool.returnCompressor(compressor); } }