Skip to content

Commit

Permalink
Merge pull request #103 from themodernlife/fix-codec-pooling
Browse files Browse the repository at this point in the history
Remove unnecessary calls to CodecPool.returnCompressor/returnDecompresso...
  • Loading branch information
sjlee committed Dec 5, 2014
2 parents ce300c9 + 72d71d1 commit d62701d
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 14 deletions.
48 changes: 42 additions & 6 deletions src/main/java/com/hadoop/compression/lzo/LzopCodec.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package com.hadoop.compression.lzo;

import java.io.DataOutputStream;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand Down Expand Up @@ -67,15 +69,19 @@ public class LzopCodec extends LzoCodec {
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
//get a compressor which will be returned to the pool when the output stream
//is closed.
return createOutputStream(out, getCompressor());
Compressor compressor = getCompressor();
OutputStream wrapped = new WrappedOutputStream(out, compressor);
return createOutputStream(wrapped, compressor);
}

public CompressionOutputStream createIndexedOutputStream(OutputStream out,
DataOutputStream indexOut)
throws IOException {
//get a compressor which will be returned to the pool when the output stream
//is closed.
return createIndexedOutputStream(out, indexOut, getCompressor());
Compressor compressor = getCompressor();
OutputStream wrapped = new WrappedOutputStream(out, compressor);
return createIndexedOutputStream(wrapped, indexOut, compressor);
}

@Override
Expand Down Expand Up @@ -106,11 +112,41 @@ public CompressionInputStream createInputStream(InputStream in,
getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE));
}

// Previous versions of the API accidentally added/removed compressor/decompressors from the pool
// when they shouldn't. This classs is kind of a hack to maintain existing behavior,
// while still allowing proper resource management from outside
private static class WrappedOutputStream extends FilterOutputStream {
private Compressor compressor;

public WrappedOutputStream(OutputStream outputStream, Compressor compressor) {
super(outputStream);
this.compressor = compressor;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}

@Override
public void close() throws IOException {
CodecPool.returnCompressor(compressor);
super.close();
}
}

@Override
public CompressionInputStream createInputStream(InputStream in) throws IOException {
// get a decompressor from a pool which will be returned to the pool
// when LzoInputStream is closed
return createInputStream(in, CodecPool.getDecompressor(this));
public CompressionInputStream createInputStream(final InputStream in) throws IOException {
final Decompressor decompressor = CodecPool.getDecompressor(this);
// maintain backwards compatibility re: returning the decompressor to the CodecPool
InputStream inputStream = new FilterInputStream(in) {
@Override
public void close() throws IOException {
CodecPool.returnDecompressor(decompressor);
super.close();
}
};
return createInputStream(inputStream, decompressor);
}

@Override
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/com/hadoop/compression/lzo/LzopInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.Decompressor;

public class LzopInputStream extends BlockDecompressorStream {
Expand Down Expand Up @@ -346,9 +345,6 @@ public void close() throws IOException {
// LZO requires that each file ends with 4 trailing zeroes. If we are here,
// the file didn't. It's not critical, though, so log and eat it in this case.
LOG.warn("Incorrect LZO file format: file did not end with four trailing zeroes.", e);
} finally{
//return the decompressor to the pool, the function itself handles null.
CodecPool.returnDecompressor(decompressor);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.zip.Adler32;

import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.io.compress.Compressor;

Expand Down Expand Up @@ -114,9 +113,6 @@ public void close() throws IOException {
indexOut.close();
}
closed = true;
//return the compressor to the pool for later reuse;
//the returnCompressor handles nulls.
CodecPool.returnCompressor(compressor);
}
}

Expand Down

0 comments on commit d62701d

Please sign in to comment.