Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blosc codec implementation #10

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
*/
package org.janelia.saalfeldlab.n5.blosc;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -38,14 +40,16 @@
import org.janelia.saalfeldlab.n5.DataBlock;
import org.janelia.saalfeldlab.n5.DefaultBlockReader;
import org.janelia.saalfeldlab.n5.DefaultBlockWriter;
import org.janelia.saalfeldlab.n5.codec.Codec;

/**
* Compression using JBlosc (https://github.com/Blosc/JBlosc) compressors.
*
* @author Stephan Saalfeld <[email protected]>
*/
@CompressionType("blosc")
public class BloscCompression implements DefaultBlockReader, DefaultBlockWriter, Compression {
public class BloscCompression implements DefaultBlockReader, DefaultBlockWriter, Compression, Codec
{

public static final int NOSHUFFLE = 0;
public static final int SHUFFLE = 1;
Expand Down Expand Up @@ -188,9 +192,9 @@ public BloscCompression getWriter() {
* {@link JBlosc} decompresses from and into {@link ByteBuffer}.
*/
@Override
public OutputStream getOutputStream(final OutputStream out) throws IOException {
public InputStream decode(InputStream in) throws IOException {

return null;
return decompressBlosc( in );
}

/**
Expand All @@ -200,6 +204,93 @@ public OutputStream getOutputStream(final OutputStream out) throws IOException {
@Override
public InputStream getInputStream(final InputStream in) throws IOException {

return null;
return decompressBlosc( in );
}

@Override
public OutputStream encode( final OutputStream out )
{
return new CompressibleByteArrayOutputStream( out );
}

@Override
public OutputStream getOutputStream(final OutputStream out) {
return new CompressibleByteArrayOutputStream( out );
}

@Override
public String getType()
{
return "blosc";
}

private ByteArrayInputStream decompressBlosc( final InputStream in ) throws IOException
{
final ByteArrayOutputStream buf = new ByteArrayOutputStream();
for (int result = in.read(); result != -1; result = in.read())
buf.write((byte) result);

final ByteBuffer src = ByteBuffer.wrap(buf.toByteArray());
final BufferSizes sizes = blosc.cbufferSizes(src);
final int dstSize = (int)sizes.getNbytes();
byte[] dstArray = new byte[dstSize];
ByteBuffer dst = ByteBuffer.wrap(dstArray);

JBlosc.decompressCtx(src, dst, dst.capacity(), nthreads);
return new ByteArrayInputStream( dstArray );
}

public class CompressibleByteArrayOutputStream extends ByteArrayOutputStream {

private final OutputStream out;

// Constructor to initialize with a decorated OutputStream
public CompressibleByteArrayOutputStream(OutputStream out) {
super();
this.out = out;
}

@Override
public synchronized void write(byte[] b, int off, int len) {

try
{
// Wrap the input data into a ByteBuffer
ByteBuffer src = ByteBuffer.wrap(b, off, len);
ByteBuffer dst = ByteBuffer.allocate(src.limit() + JBlosc.OVERHEAD);
JBlosc.compressCtx(clevel, shuffle, 1, src, src.limit(), dst, dst.limit(), cname, blocksize, nthreads);
// Write the compressed data to the decorated OutputStream
out.write(dst.array(), dst.position(), dst.remaining());
}
catch ( IOException e )
{
throw new BloscCompressionException( e.getMessage() );
}
}

// Flush the decorated OutputStream
@Override
public void flush() throws IOException {
super.flush();
if ( out != null) {
out.flush();
}
}

// Close both the ByteArrayOutputStream and the decorated OutputStream
@Override
public void close() throws IOException {
super.close();
if ( out != null) {
out.close();
}
}
}

public static class BloscCompressionException extends RuntimeException {

public BloscCompressionException(final String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,23 @@
*/
package org.janelia.saalfeldlab.n5.blosc;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import java.util.stream.IntStream;

import org.blosc.JBlosc;
import org.janelia.saalfeldlab.n5.AbstractN5Test;
Expand All @@ -45,7 +52,9 @@
import org.janelia.saalfeldlab.n5.N5FSWriter;
import org.janelia.saalfeldlab.n5.N5Reader;
import org.janelia.saalfeldlab.n5.N5Writer;
import org.janelia.saalfeldlab.n5.codec.Codec;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import com.google.gson.GsonBuilder;
Expand Down Expand Up @@ -169,4 +178,40 @@ public void testDefaultNThreads() throws IOException, URISyntaxException {
}
}

@Ignore("This unit test is ignored, since it can only be run on a machine with libblosc installed.")
@Test
public void testBloscCompressionEncodeDecode() throws IOException
{
Random random = new Random();

int n = 16;
byte[] inputData = new byte[n];
IntStream.range(0, n).forEach( i -> {
inputData[i] = (byte)(random.nextInt());
});
System.out.println("input data:" + Arrays.toString(inputData));

// encode
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Codec.BytesCodec codec = new BloscCompression();
OutputStream encodedOutputStream = codec.encode(outputStream);
encodedOutputStream.write(inputData);
encodedOutputStream.flush();
encodedOutputStream.close();

byte[] encodedData = outputStream.toByteArray();
System.out.println( "encoded data: " + Arrays.toString(encodedData));

// decode
ByteArrayInputStream is = new ByteArrayInputStream(encodedData);
InputStream decodedIs = codec.decode(is);
byte[] decodedData = new byte[n];
int bytes = decodedIs.read(decodedData);
decodedIs.close();

System.out.println("read bytes:" + bytes);
System.out.println("decoded data:" + Arrays.toString(decodedData));

assertArrayEquals( inputData, decodedData );
}
}