diff --git a/prover/lib/compressor/blob/v1/blob_maker.go b/prover/lib/compressor/blob/v1/blob_maker.go index b1a9ba944..96e6eaf1c 100644 --- a/prover/lib/compressor/blob/v1/blob_maker.go +++ b/prover/lib/compressor/blob/v1/blob_maker.go @@ -4,18 +4,16 @@ import ( "bytes" "errors" "fmt" + "github.com/consensys/compress/lzss" + fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/dictionary" "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/encode" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" + "github.com/sirupsen/logrus" "os" "slices" "strings" - - fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" - "github.com/sirupsen/logrus" - - "github.com/consensys/compress/lzss" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" ) const ( @@ -38,7 +36,7 @@ type BlobMaker struct { dict []byte // dictionary used for compression dictStore dictionary.Store // dictionary store comprising only dict, used for decompression sanity checks - Header Header + header Header // contains currentBlob data from latest **valid** call to Write // that is the header (uncompressed) and the body (compressed) @@ -73,7 +71,7 @@ func NewBlobMaker(dataLimit int, dictPath string) (*BlobMaker, error) { if err != nil { return nil, err } - copy(blobMaker.Header.DictChecksum[:], dictChecksum) + copy(blobMaker.header.DictChecksum[:], dictChecksum) blobMaker.compressor, err = lzss.NewCompressor(dict) if err != nil { @@ -88,18 +86,18 @@ func NewBlobMaker(dataLimit int, dictPath string) (*BlobMaker, error) { // StartNewBatch starts a new batch of blocks. func (bm *BlobMaker) StartNewBatch() { - bm.Header.sealBatch() + bm.header.sealBatch() } // Reset resets the bm to its initial state. func (bm *BlobMaker) Reset() { - bm.Header.resetTable() + bm.header.resetTable() bm.currentBlobLength = 0 bm.buf.Reset() bm.packBuffer.Reset() bm.compressor.Reset() - bm.StartNewBatch() + bm.header.sealBatch() } // Len returns the length of the compressed data, which includes the header. @@ -120,14 +118,14 @@ func (bm *BlobMaker) Bytes() []byte { if err != nil { var sbb strings.Builder fmt.Fprintf(&sbb, "invalid blob: %v\n", err) - fmt.Fprintf(&sbb, "header: %v\n", bm.Header) + fmt.Fprintf(&sbb, "header: %v\n", bm.header) fmt.Fprintf(&sbb, "bm.currentBlobLength: %v\n", bm.currentBlobLength) fmt.Fprintf(&sbb, "bm.currentBlob: %x\n", bm.currentBlob[:bm.currentBlobLength]) panic(sbb.String()) } // compare the header - if !header.Equals(&bm.Header) { + if !header.Equals(&bm.header) { panic("invalid blob: header mismatch") } if !bytes.Equal(rawBlocks, bm.compressor.WrittenBytes()) { @@ -140,6 +138,7 @@ func (bm *BlobMaker) Bytes() []byte { // Write attempts to append the RLP block to the current batch. // if forceReset is set; this will NOT append the bytes but still returns true if the chunk could have been appended func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error) { + prevLen := bm.compressor.Written() // decode the RLP block. var block types.Block @@ -168,17 +167,16 @@ func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error if innerErr := bm.compressor.Revert(); innerErr != nil { return false, fmt.Errorf("when reverting compressor because writing failed: %w\noriginal error: %w", innerErr, err) } - bm.Header.removeLastBlock() return false, fmt.Errorf("when writing block to compressor: %w", err) } // increment length of the current batch - bm.Header.addBlock(blockLen) + bm.header.addBlock(blockLen) // write the header to get its length. bm.buf.Reset() - if _, err = bm.Header.WriteTo(&bm.buf); err != nil { + if _, err = bm.header.WriteTo(&bm.buf); err != nil { // only possible error is an underlying writer error (shouldn't happen we use a simple in-memory buffer) - bm.Header.removeLastBlock() + bm.header.removeLastBlock() return false, fmt.Errorf("when writing header to buffer: %w", err) } @@ -190,36 +188,68 @@ func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error if err := bm.compressor.Revert(); err != nil { return false, fmt.Errorf("when reverting compressor because uncompressed blob is > maxUncompressedSize: %w", err) } - bm.Header.removeLastBlock() + bm.header.removeLastBlock() return false, nil } + fitsInBlob := func() bool { + return encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit + } + + payload := bm.compressor.WrittenBytes() + recompressionAttempted := false + revert := func() error { // from this point on, we may have recompressed the entire payload in one go + // that makes the compressor's own Revert method unusable. + bm.header.removeLastBlock() + if !recompressionAttempted { // fast path for most "CanWrite" calls + return bm.compressor.Revert() + } + // we can't use the compressor's own Revert method because we tried to compress in one go. + bm.compressor.Reset() + _, err := bm.compressor.Write(payload[:prevLen]) + return wrapError(err, "reverting the compressor") + } + // check that the header + the compressed data fits in the blob - fitsInBlob := encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit - if !fitsInBlob { - // first thing to check is if we bypass compression, would that fit? + if !fitsInBlob() { + recompressionAttempted = true + + // first thing to check is whether we can fit the block if we recompress everything in one go, known to achieve a higher ratio. + bm.compressor.Reset() + if _, err = bm.compressor.Write(payload); err != nil { + err = fmt.Errorf("when recompressing the blob: %w", err) + + if innerErr := revert(); innerErr != nil { + err = fmt.Errorf("%w\n\tto recover from write failure: %w", innerErr, err) + } + + return false, err + } + if fitsInBlob() { + goto bypass + } + + // that didn't work. a "desperate" attempt is not to compress at all. if bm.compressor.ConsiderBypassing() { // we can bypass compression and get a better ratio. // let's check if now we fit in the blob. - if encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit { + if fitsInBlob() { goto bypass } } // discard. - if err = bm.compressor.Revert(); err != nil { + if err = revert(); err != nil { return false, fmt.Errorf("when reverting compressor because blob is full: %w", err) } - bm.Header.removeLastBlock() return false, nil } bypass: if forceReset { // we don't want to append the data, but we could have. - if err = bm.compressor.Revert(); err != nil { - return false, fmt.Errorf("when reverting compressor (blob is not full but forceReset == true): %w", err) + if err = revert(); err != nil { + return false, fmt.Errorf("%w\nreverting because forceReset == true even though the blob isn't full", err) } - bm.Header.removeLastBlock() return true, nil } @@ -227,8 +257,11 @@ bypass: bm.packBuffer.Reset() n2, err := encode.PackAlign(&bm.packBuffer, bm.buf.Bytes(), fr381.Bits-1, encode.WithAdditionalInput(bm.compressor.Bytes())) if err != nil { - bm.compressor.Revert() - bm.Header.removeLastBlock() + err = fmt.Errorf("when packing blob: %w", err) + innerErr := revert() + if innerErr != nil { + err = fmt.Errorf("%w\n\twhen attempting to recover from: %w", innerErr, err) + } return false, fmt.Errorf("when packing blob: %w", err) } bm.currentBlobLength = int(n2) @@ -240,9 +273,9 @@ bypass: // Clone returns a (almost) deep copy of the bm -- this is used for test purposes. func (bm *BlobMaker) Clone() *BlobMaker { deepCopy := *bm - deepCopy.Header.BatchSizes = make([]int, len(bm.Header.BatchSizes)) + deepCopy.header.BatchSizes = make([]int, len(bm.header.BatchSizes)) - copy(deepCopy.Header.BatchSizes, bm.Header.BatchSizes) + copy(deepCopy.header.BatchSizes, bm.header.BatchSizes) return &deepCopy } @@ -258,10 +291,10 @@ func (bm *BlobMaker) Equals(other *BlobMaker) bool { if !bytes.Equal(bm.currentBlob[:bm.currentBlobLength], other.currentBlob[:other.currentBlobLength]) { return false } - if len(bm.Header.BatchSizes) != len(other.Header.BatchSizes) { + if len(bm.header.BatchSizes) != len(other.header.BatchSizes) { return false } - if !slices.Equal(bm.Header.BatchSizes, other.Header.BatchSizes) { + if !slices.Equal(bm.header.BatchSizes, other.header.BatchSizes) { return false } return true @@ -409,3 +442,10 @@ func (bm *BlobMaker) RawCompressedSize(data []byte) (int, error) { return n, nil } + +func wrapError(err error, format string, args ...any) error { + if err == nil { + return nil + } + return fmt.Errorf(format+": %w", append(args, err)...) +} diff --git a/prover/lib/compressor/blob/v1/test_utils/blob_maker_testing.go b/prover/lib/compressor/blob/v1/test_utils/blob_maker_testing.go index 7c0025013..67c3c041a 100644 --- a/prover/lib/compressor/blob/v1/test_utils/blob_maker_testing.go +++ b/prover/lib/compressor/blob/v1/test_utils/blob_maker_testing.go @@ -9,11 +9,8 @@ import ( "path/filepath" "strings" - "github.com/consensys/compress/lzss" - fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" "github.com/consensys/linea-monorepo/prover/backend/execution" "github.com/consensys/linea-monorepo/prover/lib/compressor/blob" - "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/encode" v1 "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -81,29 +78,6 @@ func RandIntn(n int) int { // TODO @Tabaie remove return int(binary.BigEndian.Uint64(b[:]) % uint64(n)) } -func EmptyBlob(t require.TestingT) []byte { - var headerB bytes.Buffer - - repoRoot, err := blob.GetRepoRootPath() - assert.NoError(t, err) - // Init bm - bm, err := v1.NewBlobMaker(1000, filepath.Join(repoRoot, "prover/lib/compressor/compressor_dict.bin")) - assert.NoError(t, err) - - if _, err = bm.Header.WriteTo(&headerB); err != nil { - panic(err) - } - - compressor, err := lzss.NewCompressor(GetDict(t)) - assert.NoError(t, err) - - var bb bytes.Buffer - if _, err = encode.PackAlign(&bb, headerB.Bytes(), fr381.Bits-1, encode.WithAdditionalInput(compressor.Bytes())); err != nil { - panic(err) - } - return bb.Bytes() -} - func SingleBlockBlob(t require.TestingT) []byte { testBlocks, bm := TestBlocksAndBlobMaker(t)