Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compressor Optimizer #367

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 74 additions & 34 deletions prover/lib/compressor/blob/v1/blob_maker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,16 @@ import (
"bytes"
"errors"
"fmt"
"github.com/consensys/compress/lzss"
fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
"github.com/consensys/linea-monorepo/prover/lib/compressor/blob/dictionary"
"github.com/consensys/linea-monorepo/prover/lib/compressor/blob/encode"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
"github.com/sirupsen/logrus"
"os"
"slices"
"strings"

fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
"github.com/sirupsen/logrus"

"github.com/consensys/compress/lzss"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
)

const (
Expand All @@ -38,7 +36,7 @@ type BlobMaker struct {
dict []byte // dictionary used for compression
dictStore dictionary.Store // dictionary store comprising only dict, used for decompression sanity checks

Header Header
header Header

// contains currentBlob data from latest **valid** call to Write
// that is the header (uncompressed) and the body (compressed)
Expand Down Expand Up @@ -73,7 +71,7 @@ func NewBlobMaker(dataLimit int, dictPath string) (*BlobMaker, error) {
if err != nil {
return nil, err
}
copy(blobMaker.Header.DictChecksum[:], dictChecksum)
copy(blobMaker.header.DictChecksum[:], dictChecksum)

blobMaker.compressor, err = lzss.NewCompressor(dict)
if err != nil {
Expand All @@ -88,18 +86,18 @@ func NewBlobMaker(dataLimit int, dictPath string) (*BlobMaker, error) {

// StartNewBatch starts a new batch of blocks.
func (bm *BlobMaker) StartNewBatch() {
bm.Header.sealBatch()
bm.header.sealBatch()
}

// Reset resets the bm to its initial state.
func (bm *BlobMaker) Reset() {
bm.Header.resetTable()
bm.header.resetTable()
bm.currentBlobLength = 0
bm.buf.Reset()
bm.packBuffer.Reset()
bm.compressor.Reset()

bm.StartNewBatch()
bm.header.sealBatch()
}

// Len returns the length of the compressed data, which includes the header.
Expand All @@ -120,14 +118,14 @@ func (bm *BlobMaker) Bytes() []byte {
if err != nil {
var sbb strings.Builder
fmt.Fprintf(&sbb, "invalid blob: %v\n", err)
fmt.Fprintf(&sbb, "header: %v\n", bm.Header)
fmt.Fprintf(&sbb, "header: %v\n", bm.header)
fmt.Fprintf(&sbb, "bm.currentBlobLength: %v\n", bm.currentBlobLength)
fmt.Fprintf(&sbb, "bm.currentBlob: %x\n", bm.currentBlob[:bm.currentBlobLength])

panic(sbb.String())
}
// compare the header
if !header.Equals(&bm.Header) {
if !header.Equals(&bm.header) {
panic("invalid blob: header mismatch")
}
if !bytes.Equal(rawBlocks, bm.compressor.WrittenBytes()) {
Expand All @@ -140,6 +138,7 @@ func (bm *BlobMaker) Bytes() []byte {
// Write attempts to append the RLP block to the current batch.
// if forceReset is set; this will NOT append the bytes but still returns true if the chunk could have been appended
func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error) {
prevLen := bm.compressor.Written()

// decode the RLP block.
var block types.Block
Expand Down Expand Up @@ -168,17 +167,16 @@ func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error
if innerErr := bm.compressor.Revert(); innerErr != nil {
return false, fmt.Errorf("when reverting compressor because writing failed: %w\noriginal error: %w", innerErr, err)
}
bm.Header.removeLastBlock()
return false, fmt.Errorf("when writing block to compressor: %w", err)
}

// increment length of the current batch
bm.Header.addBlock(blockLen)
bm.header.addBlock(blockLen)
// write the header to get its length.
bm.buf.Reset()
if _, err = bm.Header.WriteTo(&bm.buf); err != nil {
if _, err = bm.header.WriteTo(&bm.buf); err != nil {
// only possible error is an underlying writer error (shouldn't happen we use a simple in-memory buffer)
bm.Header.removeLastBlock()
bm.header.removeLastBlock()
return false, fmt.Errorf("when writing header to buffer: %w", err)
}

Expand All @@ -190,45 +188,80 @@ func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error
if err := bm.compressor.Revert(); err != nil {
return false, fmt.Errorf("when reverting compressor because uncompressed blob is > maxUncompressedSize: %w", err)
}
bm.Header.removeLastBlock()
bm.header.removeLastBlock()
return false, nil
}

fitsInBlob := func() bool {
return encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit
}

payload := bm.compressor.WrittenBytes()
recompressionAttempted := false
revert := func() error { // from this point on, we may have recompressed the entire payload in one go
// that makes the compressor's own Revert method unusable.
bm.header.removeLastBlock()
if !recompressionAttempted { // fast path for most "CanWrite" calls
return bm.compressor.Revert()
}
// we can't use the compressor's own Revert method because we tried to compress in one go.
bm.compressor.Reset()
_, err := bm.compressor.Write(payload[:prevLen])
return wrapError(err, "reverting the compressor")
}

// check that the header + the compressed data fits in the blob
fitsInBlob := encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit
if !fitsInBlob {
// first thing to check is if we bypass compression, would that fit?
if !fitsInBlob() {
recompressionAttempted = true

// first thing to check is whether we can fit the block if we recompress everything in one go, known to achieve a higher ratio.
bm.compressor.Reset()
if _, err = bm.compressor.Write(payload); err != nil {
err = fmt.Errorf("when recompressing the blob: %w", err)

if innerErr := revert(); innerErr != nil {
err = fmt.Errorf("%w\n\tto recover from write failure: %w", innerErr, err)
}

return false, err
}
if fitsInBlob() {
goto bypass
}

// that didn't work. a "desperate" attempt is not to compress at all.
if bm.compressor.ConsiderBypassing() {
// we can bypass compression and get a better ratio.
// let's check if now we fit in the blob.
if encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit {
if fitsInBlob() {
goto bypass
}
}

// discard.
if err = bm.compressor.Revert(); err != nil {
if err = revert(); err != nil {
return false, fmt.Errorf("when reverting compressor because blob is full: %w", err)
}
bm.Header.removeLastBlock()
return false, nil
}
bypass:
if forceReset {
// we don't want to append the data, but we could have.
if err = bm.compressor.Revert(); err != nil {
return false, fmt.Errorf("when reverting compressor (blob is not full but forceReset == true): %w", err)
if err = revert(); err != nil {
return false, fmt.Errorf("%w\nreverting because forceReset == true even though the blob isn't full", err)
}
bm.Header.removeLastBlock()
return true, nil
}

// copy the compressed data to the blob
bm.packBuffer.Reset()
n2, err := encode.PackAlign(&bm.packBuffer, bm.buf.Bytes(), fr381.Bits-1, encode.WithAdditionalInput(bm.compressor.Bytes()))
if err != nil {
bm.compressor.Revert()
bm.Header.removeLastBlock()
err = fmt.Errorf("when packing blob: %w", err)
innerErr := revert()
if innerErr != nil {
err = fmt.Errorf("%w\n\twhen attempting to recover from: %w", innerErr, err)
}
return false, fmt.Errorf("when packing blob: %w", err)
}
bm.currentBlobLength = int(n2)
Expand All @@ -240,9 +273,9 @@ bypass:
// Clone returns a (almost) deep copy of the bm -- this is used for test purposes.
func (bm *BlobMaker) Clone() *BlobMaker {
deepCopy := *bm
deepCopy.Header.BatchSizes = make([]int, len(bm.Header.BatchSizes))
deepCopy.header.BatchSizes = make([]int, len(bm.header.BatchSizes))

copy(deepCopy.Header.BatchSizes, bm.Header.BatchSizes)
copy(deepCopy.header.BatchSizes, bm.header.BatchSizes)

return &deepCopy
}
Expand All @@ -258,10 +291,10 @@ func (bm *BlobMaker) Equals(other *BlobMaker) bool {
if !bytes.Equal(bm.currentBlob[:bm.currentBlobLength], other.currentBlob[:other.currentBlobLength]) {
return false
}
if len(bm.Header.BatchSizes) != len(other.Header.BatchSizes) {
if len(bm.header.BatchSizes) != len(other.header.BatchSizes) {
return false
}
if !slices.Equal(bm.Header.BatchSizes, other.Header.BatchSizes) {
if !slices.Equal(bm.header.BatchSizes, other.header.BatchSizes) {
return false
}
return true
Expand Down Expand Up @@ -409,3 +442,10 @@ func (bm *BlobMaker) RawCompressedSize(data []byte) (int, error) {

return n, nil
}

func wrapError(err error, format string, args ...any) error {
if err == nil {
return nil
}
return fmt.Errorf(format+": %w", append(args, err)...)
}
26 changes: 0 additions & 26 deletions prover/lib/compressor/blob/v1/test_utils/blob_maker_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@ import (
"path/filepath"
"strings"

"github.com/consensys/compress/lzss"
fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr"
"github.com/consensys/linea-monorepo/prover/backend/execution"
"github.com/consensys/linea-monorepo/prover/lib/compressor/blob"
"github.com/consensys/linea-monorepo/prover/lib/compressor/blob/encode"
v1 "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -81,29 +78,6 @@ func RandIntn(n int) int { // TODO @Tabaie remove
return int(binary.BigEndian.Uint64(b[:]) % uint64(n))
}

func EmptyBlob(t require.TestingT) []byte {
var headerB bytes.Buffer

repoRoot, err := blob.GetRepoRootPath()
assert.NoError(t, err)
// Init bm
bm, err := v1.NewBlobMaker(1000, filepath.Join(repoRoot, "prover/lib/compressor/compressor_dict.bin"))
assert.NoError(t, err)

if _, err = bm.Header.WriteTo(&headerB); err != nil {
panic(err)
}

compressor, err := lzss.NewCompressor(GetDict(t))
assert.NoError(t, err)

var bb bytes.Buffer
if _, err = encode.PackAlign(&bb, headerB.Bytes(), fr381.Bits-1, encode.WithAdditionalInput(compressor.Bytes())); err != nil {
panic(err)
}
return bb.Bytes()
}

func SingleBlockBlob(t require.TestingT) []byte {
testBlocks, bm := TestBlocksAndBlobMaker(t)

Expand Down
Loading