diff --git a/src/main/java/org/janelia/saalfeldlab/n5/blosc/BloscCompression.java b/src/main/java/org/janelia/saalfeldlab/n5/blosc/BloscCompression.java index c21cfd7..fde7510 100644 --- a/src/main/java/org/janelia/saalfeldlab/n5/blosc/BloscCompression.java +++ b/src/main/java/org/janelia/saalfeldlab/n5/blosc/BloscCompression.java @@ -25,6 +25,8 @@ */ package org.janelia.saalfeldlab.n5.blosc; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -38,6 +40,7 @@ import org.janelia.saalfeldlab.n5.DataBlock; import org.janelia.saalfeldlab.n5.DefaultBlockReader; import org.janelia.saalfeldlab.n5.DefaultBlockWriter; +import org.janelia.saalfeldlab.n5.codec.Codec; /** * Compression using JBlosc (https://github.com/Blosc/JBlosc) compressors. @@ -45,7 +48,8 @@ * @author Stephan Saalfeld <saalfelds@janelia.hhmi.org> */ @CompressionType("blosc") -public class BloscCompression implements DefaultBlockReader, DefaultBlockWriter, Compression { +public class BloscCompression implements DefaultBlockReader, DefaultBlockWriter, Compression, Codec +{ public static final int NOSHUFFLE = 0; public static final int SHUFFLE = 1; @@ -188,9 +192,9 @@ public BloscCompression getWriter() { * {@link JBlosc} decompresses from and into {@link ByteBuffer}. */ @Override - public OutputStream getOutputStream(final OutputStream out) throws IOException { + public InputStream decode(InputStream in) throws IOException { - return null; + return decompressBlosc( in ); } /** @@ -200,6 +204,93 @@ public OutputStream getOutputStream(final OutputStream out) throws IOException { @Override public InputStream getInputStream(final InputStream in) throws IOException { - return null; + return decompressBlosc( in ); + } + + @Override + public OutputStream encode( final OutputStream out ) + { + return new CompressibleByteArrayOutputStream( out ); + } + + @Override + public OutputStream getOutputStream(final OutputStream out) { + return new CompressibleByteArrayOutputStream( out ); + } + + @Override + public String getType() + { + return "blosc"; + } + + private ByteArrayInputStream decompressBlosc( final InputStream in ) throws IOException + { + final ByteArrayOutputStream buf = new ByteArrayOutputStream(); + for (int result = in.read(); result != -1; result = in.read()) + buf.write((byte) result); + + final ByteBuffer src = ByteBuffer.wrap(buf.toByteArray()); + final BufferSizes sizes = blosc.cbufferSizes(src); + final int dstSize = (int)sizes.getNbytes(); + byte[] dstArray = new byte[dstSize]; + ByteBuffer dst = ByteBuffer.wrap(dstArray); + + JBlosc.decompressCtx(src, dst, dst.capacity(), nthreads); + return new ByteArrayInputStream( dstArray ); + } + + public class CompressibleByteArrayOutputStream extends ByteArrayOutputStream { + + private final OutputStream out; + + // Constructor to initialize with a decorated OutputStream + public CompressibleByteArrayOutputStream(OutputStream out) { + super(); + this.out = out; + } + + @Override + public synchronized void write(byte[] b, int off, int len) { + + try + { + // Wrap the input data into a ByteBuffer + ByteBuffer src = ByteBuffer.wrap(b, off, len); + ByteBuffer dst = ByteBuffer.allocate(src.limit() + JBlosc.OVERHEAD); + JBlosc.compressCtx(clevel, shuffle, 1, src, src.limit(), dst, dst.limit(), cname, blocksize, nthreads); + // Write the compressed data to the decorated OutputStream + out.write(dst.array(), dst.position(), dst.remaining()); + } + catch ( IOException e ) + { + throw new BloscCompressionException( e.getMessage() ); + } + } + + // Flush the decorated OutputStream + @Override + public void flush() throws IOException { + super.flush(); + if ( out != null) { + out.flush(); + } + } + + // Close both the ByteArrayOutputStream and the decorated OutputStream + @Override + public void close() throws IOException { + super.close(); + if ( out != null) { + out.close(); + } + } + } + + public static class BloscCompressionException extends RuntimeException { + + public BloscCompressionException(final String message) { + super(message); + } } } diff --git a/src/test/java/org/janelia/saalfeldlab/n5/blosc/BloscCompressionTest.java b/src/test/java/org/janelia/saalfeldlab/n5/blosc/BloscCompressionTest.java index 7e85180..dcd3268 100644 --- a/src/test/java/org/janelia/saalfeldlab/n5/blosc/BloscCompressionTest.java +++ b/src/test/java/org/janelia/saalfeldlab/n5/blosc/BloscCompressionTest.java @@ -25,16 +25,23 @@ */ package org.janelia.saalfeldlab.n5.blosc; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.fail; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.lang.reflect.Field; import java.net.URI; import java.net.URISyntaxException; import java.nio.file.Files; import java.util.Arrays; import java.util.Map; +import java.util.Random; +import java.util.stream.IntStream; import org.blosc.JBlosc; import org.janelia.saalfeldlab.n5.AbstractN5Test; @@ -45,7 +52,9 @@ import org.janelia.saalfeldlab.n5.N5FSWriter; import org.janelia.saalfeldlab.n5.N5Reader; import org.janelia.saalfeldlab.n5.N5Writer; +import org.janelia.saalfeldlab.n5.codec.Codec; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import com.google.gson.GsonBuilder; @@ -169,4 +178,40 @@ public void testDefaultNThreads() throws IOException, URISyntaxException { } } + @Ignore("This unit test is ignored, since it can only be run on a machine with libblosc installed.") + @Test + public void testBloscCompressionEncodeDecode() throws IOException + { + Random random = new Random(); + + int n = 16; + byte[] inputData = new byte[n]; + IntStream.range(0, n).forEach( i -> { + inputData[i] = (byte)(random.nextInt()); + }); + System.out.println("input data:" + Arrays.toString(inputData)); + + // encode + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + Codec.BytesCodec codec = new BloscCompression(); + OutputStream encodedOutputStream = codec.encode(outputStream); + encodedOutputStream.write(inputData); + encodedOutputStream.flush(); + encodedOutputStream.close(); + + byte[] encodedData = outputStream.toByteArray(); + System.out.println( "encoded data: " + Arrays.toString(encodedData)); + + // decode + ByteArrayInputStream is = new ByteArrayInputStream(encodedData); + InputStream decodedIs = codec.decode(is); + byte[] decodedData = new byte[n]; + int bytes = decodedIs.read(decodedData); + decodedIs.close(); + + System.out.println("read bytes:" + bytes); + System.out.println("decoded data:" + Arrays.toString(decodedData)); + + assertArrayEquals( inputData, decodedData ); + } }