Skip to content

Commit

Permalink
Go: add support for custom compression
Browse files Browse the repository at this point in the history
Adds support for bringing custom compressors and decompressors to the go
lexer and writer.

For the writer, a ResettableWriteCloser is accepted. For the reader, a
map of compression format to ResettableReader is accepted. If the reader
implements io.Closer we'll call that on file close.
  • Loading branch information
Wyatt Alt committed Sep 8, 2023
1 parent 4ac8e90 commit af98e5a
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 12 deletions.
2 changes: 1 addition & 1 deletion go/mcap/Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
test:
go test ./...
go test -cover ./...

bench:
go test -benchmem -run=^$$ -count 5 -bench ^BenchmarkLexer/demo -memprofile mem.out -cpuprofile cpu.out
Expand Down
28 changes: 24 additions & 4 deletions go/mcap/lexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ type Lexer struct {
maxRecordSize int
maxDecompressedChunkSize int
attachmentCallback func(*AttachmentReader) error
decompressors map[CompressionFormat]ResettableReader
}

// Next returns the next token from the lexer as a byte array. The result will
Expand Down Expand Up @@ -302,6 +303,11 @@ func (l *Lexer) Close() {
if l.decoders.zstd != nil {
l.decoders.zstd.Close()
}
for _, decompressor := range l.decompressors {
if closer, ok := decompressor.(io.Closer); ok {
closer.Close()
}
}
}

type decoders struct {
Expand Down Expand Up @@ -414,15 +420,19 @@ func loadChunk(l *Lexer, recordLen uint64) error {

// remaining bytes in the record are the chunk data
lr := io.LimitReader(l.reader, int64(recordsLength))
switch compression {
case CompressionNone:
switch {
case l.decompressors[compression] != nil: // must be top
decoder := l.decompressors[compression]
decoder.Reset(lr)
l.reader = decoder
case compression == CompressionNone:
l.reader = lr
case CompressionZSTD:
case compression == CompressionZSTD:
err = l.setZSTDDecoder(lr)
if err != nil {
return err
}
case CompressionLZ4:
case compression == CompressionLZ4:
l.setLZ4Decoder(lr)
default:
return fmt.Errorf("unsupported compression: %s", string(compression))
Expand Down Expand Up @@ -498,13 +508,20 @@ type LexerOptions struct {
MaxRecordSize int
// AttachmentCallback is a function to execute on attachments encountered in the file.
AttachmentCallback func(*AttachmentReader) error
// Decompressors are custom decompressors. Chunks matching the supplied
// compression format will be decompressed with the provided
// ResettableReader instead of the default implementation. If the
// ResettableReader also implements io.Closer, Close will be called on close
// of the reader.
Decompressors map[CompressionFormat]ResettableReader
}

// NewLexer returns a new lexer for the given reader.
func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
var maxRecordSize, maxDecompressedChunkSize int
var computeAttachmentCRCs, validateChunkCRCs, emitChunks, emitInvalidChunks, skipMagic bool
var attachmentCallback func(*AttachmentReader) error
var decompressors map[CompressionFormat]ResettableReader
if len(opts) > 0 {
validateChunkCRCs = opts[0].ValidateChunkCRCs
computeAttachmentCRCs = opts[0].ComputeAttachmentCRCs
Expand All @@ -514,13 +531,15 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
maxRecordSize = opts[0].MaxRecordSize
maxDecompressedChunkSize = opts[0].MaxDecompressedChunkSize
attachmentCallback = opts[0].AttachmentCallback
decompressors = opts[0].Decompressors
}
if !skipMagic {
err := validateMagic(r)
if err != nil {
return nil, err
}
}

return &Lexer{
basereader: r,
reader: r,
Expand All @@ -532,5 +551,6 @@ func NewLexer(r io.Reader, opts ...*LexerOptions) (*Lexer, error) {
maxRecordSize: maxRecordSize,
maxDecompressedChunkSize: maxDecompressedChunkSize,
attachmentCallback: attachmentCallback,
decompressors: decompressors,
}, nil
}
38 changes: 38 additions & 0 deletions go/mcap/lexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

"github.com/pierrec/lz4/v4"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -134,6 +135,43 @@ func TestBadMagic(t *testing.T) {
}
}

func TestCustomDecompressor(t *testing.T) {
buf := file(
header(),
chunk(t, CompressionLZ4, true, channelInfo(), message(), message()),
chunk(t, CompressionLZ4, true, channelInfo(), message(), message()),
attachment(), attachment(),
footer(),
)
lzr := lz4.NewReader(nil)
blockCount := 0
lzr.Apply(lz4.OnBlockDoneOption(func(size int) {
blockCount++
}))
lexer, err := NewLexer(bytes.NewReader(buf), &LexerOptions{
Decompressors: map[CompressionFormat]ResettableReader{
CompressionLZ4: lzr,
},
})
assert.Nil(t, err)
expected := []TokenType{
TokenHeader,
TokenChannel,
TokenMessage,
TokenMessage,
TokenChannel,
TokenMessage,
TokenMessage,
TokenFooter,
}
for i, expectedTokenType := range expected {
tokenType, _, err := lexer.Next(nil)
assert.Nil(t, err)
assert.Equal(t, expectedTokenType, tokenType, fmt.Sprintf("mismatch element %d", i))
}
assert.Positive(t, blockCount)
}

func TestReturnsEOFOnSuccessiveCalls(t *testing.T) {
lexer, err := NewLexer(bytes.NewReader(file()))
assert.Nil(t, err)
Expand Down
25 changes: 18 additions & 7 deletions go/mcap/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,10 @@ type WriterOptions struct {
// SkipMagic causes the writer to skip writing magic bytes at the start of
// the file. This may be useful for writing a partial section of records.
SkipMagic bool

// Compressor is a custom compressor. If supplied it will take precedence
// over the built-in ones.
Compressor ResettableWriteCloser
}

// Convert an MCAP compression level to the corresponding lz4.CompressionLevel.
Expand Down Expand Up @@ -833,27 +837,34 @@ func NewWriter(w io.Writer, opts *WriterOptions) (*Writer, error) {
compressed := bytes.Buffer{}
var compressedWriter *countingCRCWriter
if opts.Chunked {
switch opts.Compression {
case CompressionZSTD:
switch {
// custom compressor takes precedence.
case opts.Compressor != nil:
if opts.Compression == "" {
return nil, fmt.Errorf("custom compressor requires compression format")
}
opts.Compressor.Reset(&compressed)
compressedWriter = newCountingCRCWriter(opts.Compressor, opts.IncludeCRC)
case opts.Compression == CompressionZSTD:
level := encoderLevelFromZstd(opts.CompressionLevel)
zw, err := zstd.NewWriter(&compressed, zstd.WithEncoderLevel(level))
if err != nil {
return nil, err
}
compressedWriter = newCountingCRCWriter(zw, opts.IncludeCRC)
case CompressionLZ4:
case opts.Compression == CompressionLZ4:
level := encoderLevelFromLZ4(opts.CompressionLevel)
lzw := lz4.NewWriter(&compressed)
_ = lzw.Apply(lz4.CompressionLevelOption(level))
compressedWriter = newCountingCRCWriter(lzw, opts.IncludeCRC)
case CompressionNone:
case opts.Compression == CompressionNone:
compressedWriter = newCountingCRCWriter(bufCloser{&compressed}, opts.IncludeCRC)
default:
return nil, fmt.Errorf("unsupported compression")
}
if opts.ChunkSize == 0 {
opts.ChunkSize = 1024 * 1024
}
}
if opts.ChunkSize == 0 {
opts.ChunkSize = 1024 * 1024
}
return &Writer{
w: writer,
Expand Down
63 changes: 63 additions & 0 deletions go/mcap/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"bytes"
"crypto/md5"
"fmt"
"io"
"testing"
"time"

"github.com/pierrec/lz4/v4"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -678,3 +680,64 @@ func TestWriteAttachment(t *testing.T) {
})
}
}

func assertReadable(t *testing.T, rs io.ReadSeeker) {
reader, err := NewReader(rs)
assert.Nil(t, err)

_, err = reader.Info()
assert.Nil(t, err)

it, err := reader.Messages()
assert.Nil(t, err)
for {
_, _, _, err := it.Next(nil)
if err != nil {
assert.ErrorIs(t, err, io.EOF)
break
}
}
}

func TestBYOCompressor(t *testing.T) {
buf := &bytes.Buffer{}
// example - custom lz4 settings
lzw := lz4.NewWriter(nil)
blockCount := 0
lzw.Apply(lz4.OnBlockDoneOption(func(size int) {
blockCount++
}))

writer, err := NewWriter(buf, &WriterOptions{
Chunked: true,
ChunkSize: 1024,
Compressor: lzw,
Compression: "lz4",
})
assert.Nil(t, err)

assert.Nil(t, writer.WriteHeader(&Header{}))
assert.Nil(t, writer.WriteSchema(&Schema{
ID: 1,
Name: "schema",
Encoding: "ros1msg",
Data: []byte{},
}))
assert.Nil(t, writer.WriteChannel(&Channel{
ID: 0,
SchemaID: 1,
Topic: "/foo",
MessageEncoding: "ros1msg",
}))

for i := 0; i < 100; i++ {
assert.Nil(t, writer.WriteMessage(&Message{
ChannelID: 0,
Sequence: 0,
LogTime: uint64(i),
}))
}
assert.Nil(t, writer.Close())
assertReadable(t, bytes.NewReader(buf.Bytes()))
assert.Positive(t, blockCount)
}

0 comments on commit af98e5a

Please sign in to comment.