From b2a5010dd118f68fa29ef98a8f1ef26377db2f60 Mon Sep 17 00:00:00 2001 From: Arya Tabaie <15056835+Tabaie@users.noreply.github.com> Date: Fri, 29 Nov 2024 17:48:02 -0600 Subject: [PATCH 1/5] feat clock based optimizer --- prover/lib/compressor/blob/v1/blob_maker.go | 129 ++++++++++++++++++++ 1 file changed, 129 insertions(+) diff --git a/prover/lib/compressor/blob/v1/blob_maker.go b/prover/lib/compressor/blob/v1/blob_maker.go index b1a9ba944..adf739925 100644 --- a/prover/lib/compressor/blob/v1/blob_maker.go +++ b/prover/lib/compressor/blob/v1/blob_maker.go @@ -2,6 +2,7 @@ package v1 import ( "bytes" + "encoding/base64" "errors" "fmt" "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/dictionary" @@ -9,6 +10,8 @@ import ( "os" "slices" "strings" + "sync" + "time" fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" "github.com/sirupsen/logrus" @@ -49,6 +52,11 @@ type BlobMaker struct { // some buffers to avoid repeated allocations buf bytes.Buffer packBuffer bytes.Buffer + + lock sync.Mutex + optimizerClock *time.Ticker + modificationNum uint64 // keeps track of changes to the blob + optimizerModificationNum uint64 // the last modification used by the optimizer } // NewBlobMaker returns a new bm. @@ -88,11 +96,19 @@ func NewBlobMaker(dataLimit int, dictPath string) (*BlobMaker, error) { // StartNewBatch starts a new batch of blocks. func (bm *BlobMaker) StartNewBatch() { + bm.lock.Lock() + defer bm.lock.Unlock() + bm.modificationNum++ + bm.Header.sealBatch() } // Reset resets the bm to its initial state. func (bm *BlobMaker) Reset() { + bm.lock.Lock() + defer bm.lock.Unlock() + bm.modificationNum++ + bm.Header.resetTable() bm.currentBlobLength = 0 bm.buf.Reset() @@ -114,6 +130,9 @@ func (bm *BlobMaker) Written() int { // Bytes returns the compressed data. Note that it returns a slice of the internal buffer, // it is the caller's responsibility to copy the data if needed. func (bm *BlobMaker) Bytes() []byte { + bm.lock.Lock() + defer bm.lock.Unlock() + if bm.currentBlobLength > 0 { // sanity check that we can always decompress. header, rawBlocks, _, err := DecompressBlob(bm.currentBlob[:bm.currentBlobLength], bm.dictStore) @@ -140,6 +159,9 @@ func (bm *BlobMaker) Bytes() []byte { // Write attempts to append the RLP block to the current batch. // if forceReset is set; this will NOT append the bytes but still returns true if the chunk could have been appended func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error) { + bm.lock.Lock() + defer bm.lock.Unlock() + bm.modificationNum++ // decode the RLP block. var block types.Block @@ -409,3 +431,110 @@ func (bm *BlobMaker) RawCompressedSize(data []byte) (int, error) { return n, nil } + +func (bm *BlobMaker) StopOptimizer() { + bm.lock.Lock() // if the optimizer is currently running, let it finish + if bm.optimizerClock != nil { + bm.optimizerClock.Stop() + } + bm.lock.Unlock() +} + +// StartOptimizer starts an optimizer that will try to compress the blob further periodically. +func (bm *BlobMaker) StartOptimizer(period time.Duration) { + if bm.optimizerClock != nil { + bm.lock.Lock() + bm.optimizerClock.Reset(period) + bm.lock.Unlock() + return + } + + bm.optimizerClock = time.NewTicker(period) + go func() { + var headerBuffer, packingBuffer bytes.Buffer + packingBuffer.Grow(MaxUsableBytes) + compressor, err := lzss.NewCompressor(bm.dict) + if err != nil { + logrus.WithError(err).Error("optimizer failed to create compressor") + return + } + + for range bm.optimizerClock.C { + + modificationNum := bm.modificationNum + if modificationNum == bm.optimizerModificationNum { + continue // no new data to optimize + } + + headerBuffer.Reset() + packingBuffer.Reset() + compressor.Reset() + + if _, err = bm.Header.WriteTo(&headerBuffer); err != nil { + logrus.WithError(err).Error("optimizer failed to write header to buffer") + continue + } + + if _, err = compressor.Write(bm.compressor.WrittenBytes()); err != nil { + logrus.WithError(err).Error("optimizer failed to write compressor data") + continue + } + + n, err := encode.PackAlign(&packingBuffer, headerBuffer.Bytes(), fr381.Bits-1, encode.WithAdditionalInput(compressor.Bytes())) + if err != nil { + logrus.WithError(err).Error("optimizer failed to pack align") + continue + } + + // only go ahead with this if we're getting a better compression ratio + if bm.compressor.Len() < compressor.Len() { + logrus.Info("optimizer getting a worse compression ratio") + continue + } + + if n > MaxUsableBytes { + logrus.Info("optimizer outgrowing blob capacity") + continue + } + + // decompress and validate + // they may not be as catastrophic as they seem + // could be caused by changes to the blob as this method was running + header, payload, _, err := DecompressBlob(packingBuffer.Bytes(), bm.dictStore) + if err != nil { + logrus.WithError(err).Warn("optimizer failed to decompress blob", base64.StdEncoding.EncodeToString(packingBuffer.Bytes())) + continue + } + + if !header.Equals(&bm.Header) { + logrus.Warn("optimizer header mismatch") + continue + } + + if !bytes.Equal(payload, bm.compressor.WrittenBytes()) { + logrus.Warn("optimizer payload mismatch") + continue + } + + bm.lock.Lock() + + // blob has been modified since the optimizer started + if bm.modificationNum != modificationNum { + logrus.Warn("blob changed under the optimiser's feet") + bm.lock.Unlock() + continue + } + + // swap the compressors + bm.compressor, compressor = compressor, bm.compressor + + // store the new blob + bm.currentBlobLength = int(n) + copy(bm.currentBlob[:n], packingBuffer.Bytes()) + + bm.lock.Unlock() + + bm.optimizerModificationNum = modificationNum + } + }() +} From dc76a1e303ea2af29317394d6e576f756aed729a Mon Sep 17 00:00:00 2001 From: Arya Tabaie <15056835+Tabaie@users.noreply.github.com> Date: Fri, 29 Nov 2024 18:59:52 -0600 Subject: [PATCH 2/5] feat synchronously-called optimizer --- prover/lib/compressor/blob/v1/blob_maker.go | 95 ++++++++++++--------- 1 file changed, 55 insertions(+), 40 deletions(-) diff --git a/prover/lib/compressor/blob/v1/blob_maker.go b/prover/lib/compressor/blob/v1/blob_maker.go index adf739925..fdebb4fd5 100644 --- a/prover/lib/compressor/blob/v1/blob_maker.go +++ b/prover/lib/compressor/blob/v1/blob_maker.go @@ -5,16 +5,14 @@ import ( "encoding/base64" "errors" "fmt" + fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/dictionary" "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/encode" + "github.com/sirupsen/logrus" "os" "slices" "strings" "sync" - "time" - - fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" - "github.com/sirupsen/logrus" "github.com/consensys/compress/lzss" "github.com/ethereum/go-ethereum/core/types" @@ -53,10 +51,10 @@ type BlobMaker struct { buf bytes.Buffer packBuffer bytes.Buffer - lock sync.Mutex - optimizerClock *time.Ticker - modificationNum uint64 // keeps track of changes to the blob - optimizerModificationNum uint64 // the last modification used by the optimizer + lock sync.Mutex // protects from concurrent writes + doOptimize chan struct{} // signal to the optimizer that a change has been made + modificationNum uint64 // id for the last change to blob + optimizerModificationNum uint64 // the last change used by the optimizer } // NewBlobMaker returns a new bm. @@ -91,22 +89,29 @@ func NewBlobMaker(dataLimit int, dictPath string) (*BlobMaker, error) { // initialize state blobMaker.StartNewBatch() + blobMaker.startOptimizer() + return &blobMaker, nil } // StartNewBatch starts a new batch of blocks. func (bm *BlobMaker) StartNewBatch() { - bm.lock.Lock() - defer bm.lock.Unlock() - bm.modificationNum++ + bm.obtainLock() + defer bm.releaseLock() + bm.startNewBatch() +} +// thread-unsafe version of StartNewBatch +// to be called from a method with lock ownership +func (bm *BlobMaker) startNewBatch() { + bm.modificationNum++ bm.Header.sealBatch() } // Reset resets the bm to its initial state. func (bm *BlobMaker) Reset() { - bm.lock.Lock() - defer bm.lock.Unlock() + bm.obtainLock() + defer bm.releaseLock() bm.modificationNum++ bm.Header.resetTable() @@ -115,7 +120,7 @@ func (bm *BlobMaker) Reset() { bm.packBuffer.Reset() bm.compressor.Reset() - bm.StartNewBatch() + bm.startNewBatch() } // Len returns the length of the compressed data, which includes the header. @@ -130,8 +135,8 @@ func (bm *BlobMaker) Written() int { // Bytes returns the compressed data. Note that it returns a slice of the internal buffer, // it is the caller's responsibility to copy the data if needed. func (bm *BlobMaker) Bytes() []byte { - bm.lock.Lock() - defer bm.lock.Unlock() + bm.obtainLock() + defer bm.releaseLock() if bm.currentBlobLength > 0 { // sanity check that we can always decompress. @@ -159,8 +164,8 @@ func (bm *BlobMaker) Bytes() []byte { // Write attempts to append the RLP block to the current batch. // if forceReset is set; this will NOT append the bytes but still returns true if the chunk could have been appended func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error) { - bm.lock.Lock() - defer bm.lock.Unlock() + bm.obtainLock() + defer bm.releaseLock() bm.modificationNum++ // decode the RLP block. @@ -256,6 +261,7 @@ bypass: bm.currentBlobLength = int(n2) copy(bm.currentBlob[:bm.currentBlobLength], bm.packBuffer.Bytes()) + bm.tryOptimize() return true, nil } @@ -433,37 +439,35 @@ func (bm *BlobMaker) RawCompressedSize(data []byte) (int, error) { } func (bm *BlobMaker) StopOptimizer() { - bm.lock.Lock() // if the optimizer is currently running, let it finish - if bm.optimizerClock != nil { - bm.optimizerClock.Stop() + if bm.doOptimize != nil { + close(bm.doOptimize) } - bm.lock.Unlock() } -// StartOptimizer starts an optimizer that will try to compress the blob further periodically. -func (bm *BlobMaker) StartOptimizer(period time.Duration) { - if bm.optimizerClock != nil { - bm.lock.Lock() - bm.optimizerClock.Reset(period) - bm.lock.Unlock() - return +func (bm *BlobMaker) tryOptimize() { + if bm.doOptimize != nil && len(bm.doOptimize) == 0 { + bm.doOptimize <- struct{}{} } +} - bm.optimizerClock = time.NewTicker(period) +func (bm *BlobMaker) startOptimizer() { + bm.doOptimize = make(chan struct{}, 1) go func() { var headerBuffer, packingBuffer bytes.Buffer - packingBuffer.Grow(MaxUsableBytes) + packingBuffer.Grow(MaxUsableBytes) // used for packing and holding the raw payload compressor, err := lzss.NewCompressor(bm.dict) if err != nil { logrus.WithError(err).Error("optimizer failed to create compressor") return } - for range bm.optimizerClock.C { - + for range bm.doOptimize { modificationNum := bm.modificationNum - if modificationNum == bm.optimizerModificationNum { - continue // no new data to optimize + if bm.optimizerModificationNum == modificationNum { + continue // already optimized + } + if bm.optimizerModificationNum > modificationNum { + panic("optimizer modification num is ahead of the actual modification num") } headerBuffer.Reset() @@ -516,25 +520,36 @@ func (bm *BlobMaker) StartOptimizer(period time.Duration) { continue } - bm.lock.Lock() + bm.obtainLock() // blob has been modified since the optimizer started if bm.modificationNum != modificationNum { - logrus.Warn("blob changed under the optimiser's feet") - bm.lock.Unlock() + logrus.Warn("optimized data version obsolete") + bm.releaseLock() continue } - // swap the compressors + // swap the compressors (our current one has the payload) bm.compressor, compressor = compressor, bm.compressor // store the new blob bm.currentBlobLength = int(n) copy(bm.currentBlob[:n], packingBuffer.Bytes()) - bm.lock.Unlock() + bm.releaseLock() bm.optimizerModificationNum = modificationNum + logrus.Info("optimizer success") } }() } + +// obtainLock and releaseLock wrap bm.lock.Lock and bm.lock.Unlock respectively, in order to enable +// logging for debugging purposes. Normally (hopefully?) they are be inlined by the compiler. +func (bm *BlobMaker) obtainLock() { + bm.lock.Lock() +} + +func (bm *BlobMaker) releaseLock() { + bm.lock.Unlock() +} From a5292bb5c9c0b897a7530ccda003d388f88dc6d6 Mon Sep 17 00:00:00 2001 From: Arya Tabaie <15056835+Tabaie@users.noreply.github.com> Date: Fri, 29 Nov 2024 19:10:04 -0600 Subject: [PATCH 3/5] perf: insistent optimizer --- prover/lib/compressor/blob/v1/blob_maker.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/prover/lib/compressor/blob/v1/blob_maker.go b/prover/lib/compressor/blob/v1/blob_maker.go index fdebb4fd5..64ebbc9c5 100644 --- a/prover/lib/compressor/blob/v1/blob_maker.go +++ b/prover/lib/compressor/blob/v1/blob_maker.go @@ -438,12 +438,6 @@ func (bm *BlobMaker) RawCompressedSize(data []byte) (int, error) { return n, nil } -func (bm *BlobMaker) StopOptimizer() { - if bm.doOptimize != nil { - close(bm.doOptimize) - } -} - func (bm *BlobMaker) tryOptimize() { if bm.doOptimize != nil && len(bm.doOptimize) == 0 { bm.doOptimize <- struct{}{} @@ -462,6 +456,8 @@ func (bm *BlobMaker) startOptimizer() { } for range bm.doOptimize { + newData: + modificationNum := bm.modificationNum if bm.optimizerModificationNum == modificationNum { continue // already optimized @@ -512,12 +508,12 @@ func (bm *BlobMaker) startOptimizer() { if !header.Equals(&bm.Header) { logrus.Warn("optimizer header mismatch") - continue + goto newData // assuming this is not actually a compression error, but caused by changes to the blob } if !bytes.Equal(payload, bm.compressor.WrittenBytes()) { logrus.Warn("optimizer payload mismatch") - continue + goto newData } bm.obtainLock() @@ -526,7 +522,7 @@ func (bm *BlobMaker) startOptimizer() { if bm.modificationNum != modificationNum { logrus.Warn("optimized data version obsolete") bm.releaseLock() - continue + goto newData } // swap the compressors (our current one has the payload) From 85ca792f0db01995b03f3ca6d50255cb30ae7c59 Mon Sep 17 00:00:00 2001 From: Arya Tabaie <15056835+Tabaie@users.noreply.github.com> Date: Fri, 29 Nov 2024 19:23:13 -0600 Subject: [PATCH 4/5] fix private header --- prover/lib/compressor/blob/v1/blob_maker.go | 46 +++++++++---------- .../blob/v1/test_utils/blob_maker_testing.go | 26 ----------- 2 files changed, 23 insertions(+), 49 deletions(-) diff --git a/prover/lib/compressor/blob/v1/blob_maker.go b/prover/lib/compressor/blob/v1/blob_maker.go index 64ebbc9c5..fe12080d9 100644 --- a/prover/lib/compressor/blob/v1/blob_maker.go +++ b/prover/lib/compressor/blob/v1/blob_maker.go @@ -39,7 +39,7 @@ type BlobMaker struct { dict []byte // dictionary used for compression dictStore dictionary.Store // dictionary store comprising only dict, used for decompression sanity checks - Header Header + header Header // contains currentBlob data from latest **valid** call to Write // that is the header (uncompressed) and the body (compressed) @@ -51,7 +51,7 @@ type BlobMaker struct { buf bytes.Buffer packBuffer bytes.Buffer - lock sync.Mutex // protects from concurrent writes + lock sync.Mutex // protects from concurrent writes by the optimizer. no thread safety guarantees doOptimize chan struct{} // signal to the optimizer that a change has been made modificationNum uint64 // id for the last change to blob optimizerModificationNum uint64 // the last change used by the optimizer @@ -79,7 +79,7 @@ func NewBlobMaker(dataLimit int, dictPath string) (*BlobMaker, error) { if err != nil { return nil, err } - copy(blobMaker.Header.DictChecksum[:], dictChecksum) + copy(blobMaker.header.DictChecksum[:], dictChecksum) blobMaker.compressor, err = lzss.NewCompressor(dict) if err != nil { @@ -105,7 +105,7 @@ func (bm *BlobMaker) StartNewBatch() { // to be called from a method with lock ownership func (bm *BlobMaker) startNewBatch() { bm.modificationNum++ - bm.Header.sealBatch() + bm.header.sealBatch() } // Reset resets the bm to its initial state. @@ -114,7 +114,7 @@ func (bm *BlobMaker) Reset() { defer bm.releaseLock() bm.modificationNum++ - bm.Header.resetTable() + bm.header.resetTable() bm.currentBlobLength = 0 bm.buf.Reset() bm.packBuffer.Reset() @@ -135,7 +135,7 @@ func (bm *BlobMaker) Written() int { // Bytes returns the compressed data. Note that it returns a slice of the internal buffer, // it is the caller's responsibility to copy the data if needed. func (bm *BlobMaker) Bytes() []byte { - bm.obtainLock() + bm.obtainLock() // to make sure no one (the optimizer specifically) is modifying the blob, possibly resulting in corrupted data defer bm.releaseLock() if bm.currentBlobLength > 0 { @@ -144,14 +144,14 @@ func (bm *BlobMaker) Bytes() []byte { if err != nil { var sbb strings.Builder fmt.Fprintf(&sbb, "invalid blob: %v\n", err) - fmt.Fprintf(&sbb, "header: %v\n", bm.Header) + fmt.Fprintf(&sbb, "header: %v\n", bm.header) fmt.Fprintf(&sbb, "bm.currentBlobLength: %v\n", bm.currentBlobLength) fmt.Fprintf(&sbb, "bm.currentBlob: %x\n", bm.currentBlob[:bm.currentBlobLength]) panic(sbb.String()) } // compare the header - if !header.Equals(&bm.Header) { + if !header.Equals(&bm.header) { panic("invalid blob: header mismatch") } if !bytes.Equal(rawBlocks, bm.compressor.WrittenBytes()) { @@ -195,17 +195,17 @@ func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error if innerErr := bm.compressor.Revert(); innerErr != nil { return false, fmt.Errorf("when reverting compressor because writing failed: %w\noriginal error: %w", innerErr, err) } - bm.Header.removeLastBlock() + bm.header.removeLastBlock() return false, fmt.Errorf("when writing block to compressor: %w", err) } // increment length of the current batch - bm.Header.addBlock(blockLen) + bm.header.addBlock(blockLen) // write the header to get its length. bm.buf.Reset() - if _, err = bm.Header.WriteTo(&bm.buf); err != nil { + if _, err = bm.header.WriteTo(&bm.buf); err != nil { // only possible error is an underlying writer error (shouldn't happen we use a simple in-memory buffer) - bm.Header.removeLastBlock() + bm.header.removeLastBlock() return false, fmt.Errorf("when writing header to buffer: %w", err) } @@ -217,7 +217,7 @@ func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error if err := bm.compressor.Revert(); err != nil { return false, fmt.Errorf("when reverting compressor because uncompressed blob is > maxUncompressedSize: %w", err) } - bm.Header.removeLastBlock() + bm.header.removeLastBlock() return false, nil } @@ -237,7 +237,7 @@ func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error if err = bm.compressor.Revert(); err != nil { return false, fmt.Errorf("when reverting compressor because blob is full: %w", err) } - bm.Header.removeLastBlock() + bm.header.removeLastBlock() return false, nil } bypass: @@ -246,7 +246,7 @@ bypass: if err = bm.compressor.Revert(); err != nil { return false, fmt.Errorf("when reverting compressor (blob is not full but forceReset == true): %w", err) } - bm.Header.removeLastBlock() + bm.header.removeLastBlock() return true, nil } @@ -255,7 +255,7 @@ bypass: n2, err := encode.PackAlign(&bm.packBuffer, bm.buf.Bytes(), fr381.Bits-1, encode.WithAdditionalInput(bm.compressor.Bytes())) if err != nil { bm.compressor.Revert() - bm.Header.removeLastBlock() + bm.header.removeLastBlock() return false, fmt.Errorf("when packing blob: %w", err) } bm.currentBlobLength = int(n2) @@ -268,9 +268,9 @@ bypass: // Clone returns a (almost) deep copy of the bm -- this is used for test purposes. func (bm *BlobMaker) Clone() *BlobMaker { deepCopy := *bm - deepCopy.Header.BatchSizes = make([]int, len(bm.Header.BatchSizes)) + deepCopy.header.BatchSizes = make([]int, len(bm.header.BatchSizes)) - copy(deepCopy.Header.BatchSizes, bm.Header.BatchSizes) + copy(deepCopy.header.BatchSizes, bm.header.BatchSizes) return &deepCopy } @@ -286,10 +286,10 @@ func (bm *BlobMaker) Equals(other *BlobMaker) bool { if !bytes.Equal(bm.currentBlob[:bm.currentBlobLength], other.currentBlob[:other.currentBlobLength]) { return false } - if len(bm.Header.BatchSizes) != len(other.Header.BatchSizes) { + if len(bm.header.BatchSizes) != len(other.header.BatchSizes) { return false } - if !slices.Equal(bm.Header.BatchSizes, other.Header.BatchSizes) { + if !slices.Equal(bm.header.BatchSizes, other.header.BatchSizes) { return false } return true @@ -470,7 +470,7 @@ func (bm *BlobMaker) startOptimizer() { packingBuffer.Reset() compressor.Reset() - if _, err = bm.Header.WriteTo(&headerBuffer); err != nil { + if _, err = bm.header.WriteTo(&headerBuffer); err != nil { logrus.WithError(err).Error("optimizer failed to write header to buffer") continue } @@ -506,7 +506,7 @@ func (bm *BlobMaker) startOptimizer() { continue } - if !header.Equals(&bm.Header) { + if !header.Equals(&bm.header) { logrus.Warn("optimizer header mismatch") goto newData // assuming this is not actually a compression error, but caused by changes to the blob } @@ -541,7 +541,7 @@ func (bm *BlobMaker) startOptimizer() { } // obtainLock and releaseLock wrap bm.lock.Lock and bm.lock.Unlock respectively, in order to enable -// logging for debugging purposes. Normally (hopefully?) they are be inlined by the compiler. +// logging for debugging purposes. Normally (hopefully?) they would be inlined by the compiler. func (bm *BlobMaker) obtainLock() { bm.lock.Lock() } diff --git a/prover/lib/compressor/blob/v1/test_utils/blob_maker_testing.go b/prover/lib/compressor/blob/v1/test_utils/blob_maker_testing.go index 7c0025013..67c3c041a 100644 --- a/prover/lib/compressor/blob/v1/test_utils/blob_maker_testing.go +++ b/prover/lib/compressor/blob/v1/test_utils/blob_maker_testing.go @@ -9,11 +9,8 @@ import ( "path/filepath" "strings" - "github.com/consensys/compress/lzss" - fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" "github.com/consensys/linea-monorepo/prover/backend/execution" "github.com/consensys/linea-monorepo/prover/lib/compressor/blob" - "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/encode" v1 "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/v1" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -81,29 +78,6 @@ func RandIntn(n int) int { // TODO @Tabaie remove return int(binary.BigEndian.Uint64(b[:]) % uint64(n)) } -func EmptyBlob(t require.TestingT) []byte { - var headerB bytes.Buffer - - repoRoot, err := blob.GetRepoRootPath() - assert.NoError(t, err) - // Init bm - bm, err := v1.NewBlobMaker(1000, filepath.Join(repoRoot, "prover/lib/compressor/compressor_dict.bin")) - assert.NoError(t, err) - - if _, err = bm.Header.WriteTo(&headerB); err != nil { - panic(err) - } - - compressor, err := lzss.NewCompressor(GetDict(t)) - assert.NoError(t, err) - - var bb bytes.Buffer - if _, err = encode.PackAlign(&bb, headerB.Bytes(), fr381.Bits-1, encode.WithAdditionalInput(compressor.Bytes())); err != nil { - panic(err) - } - return bb.Bytes() -} - func SingleBlockBlob(t require.TestingT) []byte { testBlocks, bm := TestBlocksAndBlobMaker(t) From 8f370b5942cf918cd256fbff46482a5c5630ea97 Mon Sep 17 00:00:00 2001 From: Arya Tabaie <15056835+Tabaie@users.noreply.github.com> Date: Fri, 6 Dec 2024 15:23:13 -0600 Subject: [PATCH 5/5] refactor: synchronous optimization --- prover/lib/compressor/blob/v1/blob_maker.go | 210 +++++--------------- 1 file changed, 55 insertions(+), 155 deletions(-) diff --git a/prover/lib/compressor/blob/v1/blob_maker.go b/prover/lib/compressor/blob/v1/blob_maker.go index fe12080d9..96e6eaf1c 100644 --- a/prover/lib/compressor/blob/v1/blob_maker.go +++ b/prover/lib/compressor/blob/v1/blob_maker.go @@ -2,21 +2,18 @@ package v1 import ( "bytes" - "encoding/base64" "errors" "fmt" + "github.com/consensys/compress/lzss" fr381 "github.com/consensys/gnark-crypto/ecc/bls12-381/fr" "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/dictionary" "github.com/consensys/linea-monorepo/prover/lib/compressor/blob/encode" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/rlp" "github.com/sirupsen/logrus" "os" "slices" "strings" - "sync" - - "github.com/consensys/compress/lzss" - "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/rlp" ) const ( @@ -50,11 +47,6 @@ type BlobMaker struct { // some buffers to avoid repeated allocations buf bytes.Buffer packBuffer bytes.Buffer - - lock sync.Mutex // protects from concurrent writes by the optimizer. no thread safety guarantees - doOptimize chan struct{} // signal to the optimizer that a change has been made - modificationNum uint64 // id for the last change to blob - optimizerModificationNum uint64 // the last change used by the optimizer } // NewBlobMaker returns a new bm. @@ -89,38 +81,23 @@ func NewBlobMaker(dataLimit int, dictPath string) (*BlobMaker, error) { // initialize state blobMaker.StartNewBatch() - blobMaker.startOptimizer() - return &blobMaker, nil } // StartNewBatch starts a new batch of blocks. func (bm *BlobMaker) StartNewBatch() { - bm.obtainLock() - defer bm.releaseLock() - bm.startNewBatch() -} - -// thread-unsafe version of StartNewBatch -// to be called from a method with lock ownership -func (bm *BlobMaker) startNewBatch() { - bm.modificationNum++ bm.header.sealBatch() } // Reset resets the bm to its initial state. func (bm *BlobMaker) Reset() { - bm.obtainLock() - defer bm.releaseLock() - bm.modificationNum++ - bm.header.resetTable() bm.currentBlobLength = 0 bm.buf.Reset() bm.packBuffer.Reset() bm.compressor.Reset() - bm.startNewBatch() + bm.header.sealBatch() } // Len returns the length of the compressed data, which includes the header. @@ -135,9 +112,6 @@ func (bm *BlobMaker) Written() int { // Bytes returns the compressed data. Note that it returns a slice of the internal buffer, // it is the caller's responsibility to copy the data if needed. func (bm *BlobMaker) Bytes() []byte { - bm.obtainLock() // to make sure no one (the optimizer specifically) is modifying the blob, possibly resulting in corrupted data - defer bm.releaseLock() - if bm.currentBlobLength > 0 { // sanity check that we can always decompress. header, rawBlocks, _, err := DecompressBlob(bm.currentBlob[:bm.currentBlobLength], bm.dictStore) @@ -164,9 +138,7 @@ func (bm *BlobMaker) Bytes() []byte { // Write attempts to append the RLP block to the current batch. // if forceReset is set; this will NOT append the bytes but still returns true if the chunk could have been appended func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error) { - bm.obtainLock() - defer bm.releaseLock() - bm.modificationNum++ + prevLen := bm.compressor.Written() // decode the RLP block. var block types.Block @@ -195,7 +167,6 @@ func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error if innerErr := bm.compressor.Revert(); innerErr != nil { return false, fmt.Errorf("when reverting compressor because writing failed: %w\noriginal error: %w", innerErr, err) } - bm.header.removeLastBlock() return false, fmt.Errorf("when writing block to compressor: %w", err) } @@ -221,32 +192,64 @@ func (bm *BlobMaker) Write(rlpBlock []byte, forceReset bool) (ok bool, err error return false, nil } + fitsInBlob := func() bool { + return encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit + } + + payload := bm.compressor.WrittenBytes() + recompressionAttempted := false + revert := func() error { // from this point on, we may have recompressed the entire payload in one go + // that makes the compressor's own Revert method unusable. + bm.header.removeLastBlock() + if !recompressionAttempted { // fast path for most "CanWrite" calls + return bm.compressor.Revert() + } + // we can't use the compressor's own Revert method because we tried to compress in one go. + bm.compressor.Reset() + _, err := bm.compressor.Write(payload[:prevLen]) + return wrapError(err, "reverting the compressor") + } + // check that the header + the compressed data fits in the blob - fitsInBlob := encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit - if !fitsInBlob { - // first thing to check is if we bypass compression, would that fit? + if !fitsInBlob() { + recompressionAttempted = true + + // first thing to check is whether we can fit the block if we recompress everything in one go, known to achieve a higher ratio. + bm.compressor.Reset() + if _, err = bm.compressor.Write(payload); err != nil { + err = fmt.Errorf("when recompressing the blob: %w", err) + + if innerErr := revert(); innerErr != nil { + err = fmt.Errorf("%w\n\tto recover from write failure: %w", innerErr, err) + } + + return false, err + } + if fitsInBlob() { + goto bypass + } + + // that didn't work. a "desperate" attempt is not to compress at all. if bm.compressor.ConsiderBypassing() { // we can bypass compression and get a better ratio. // let's check if now we fit in the blob. - if encode.PackAlignSize(bm.buf.Len()+bm.compressor.Len(), fr381.Bits-1) <= bm.Limit { + if fitsInBlob() { goto bypass } } // discard. - if err = bm.compressor.Revert(); err != nil { + if err = revert(); err != nil { return false, fmt.Errorf("when reverting compressor because blob is full: %w", err) } - bm.header.removeLastBlock() return false, nil } bypass: if forceReset { // we don't want to append the data, but we could have. - if err = bm.compressor.Revert(); err != nil { - return false, fmt.Errorf("when reverting compressor (blob is not full but forceReset == true): %w", err) + if err = revert(); err != nil { + return false, fmt.Errorf("%w\nreverting because forceReset == true even though the blob isn't full", err) } - bm.header.removeLastBlock() return true, nil } @@ -254,14 +257,16 @@ bypass: bm.packBuffer.Reset() n2, err := encode.PackAlign(&bm.packBuffer, bm.buf.Bytes(), fr381.Bits-1, encode.WithAdditionalInput(bm.compressor.Bytes())) if err != nil { - bm.compressor.Revert() - bm.header.removeLastBlock() + err = fmt.Errorf("when packing blob: %w", err) + innerErr := revert() + if innerErr != nil { + err = fmt.Errorf("%w\n\twhen attempting to recover from: %w", innerErr, err) + } return false, fmt.Errorf("when packing blob: %w", err) } bm.currentBlobLength = int(n2) copy(bm.currentBlob[:bm.currentBlobLength], bm.packBuffer.Bytes()) - bm.tryOptimize() return true, nil } @@ -438,114 +443,9 @@ func (bm *BlobMaker) RawCompressedSize(data []byte) (int, error) { return n, nil } -func (bm *BlobMaker) tryOptimize() { - if bm.doOptimize != nil && len(bm.doOptimize) == 0 { - bm.doOptimize <- struct{}{} +func wrapError(err error, format string, args ...any) error { + if err == nil { + return nil } -} - -func (bm *BlobMaker) startOptimizer() { - bm.doOptimize = make(chan struct{}, 1) - go func() { - var headerBuffer, packingBuffer bytes.Buffer - packingBuffer.Grow(MaxUsableBytes) // used for packing and holding the raw payload - compressor, err := lzss.NewCompressor(bm.dict) - if err != nil { - logrus.WithError(err).Error("optimizer failed to create compressor") - return - } - - for range bm.doOptimize { - newData: - - modificationNum := bm.modificationNum - if bm.optimizerModificationNum == modificationNum { - continue // already optimized - } - if bm.optimizerModificationNum > modificationNum { - panic("optimizer modification num is ahead of the actual modification num") - } - - headerBuffer.Reset() - packingBuffer.Reset() - compressor.Reset() - - if _, err = bm.header.WriteTo(&headerBuffer); err != nil { - logrus.WithError(err).Error("optimizer failed to write header to buffer") - continue - } - - if _, err = compressor.Write(bm.compressor.WrittenBytes()); err != nil { - logrus.WithError(err).Error("optimizer failed to write compressor data") - continue - } - - n, err := encode.PackAlign(&packingBuffer, headerBuffer.Bytes(), fr381.Bits-1, encode.WithAdditionalInput(compressor.Bytes())) - if err != nil { - logrus.WithError(err).Error("optimizer failed to pack align") - continue - } - - // only go ahead with this if we're getting a better compression ratio - if bm.compressor.Len() < compressor.Len() { - logrus.Info("optimizer getting a worse compression ratio") - continue - } - - if n > MaxUsableBytes { - logrus.Info("optimizer outgrowing blob capacity") - continue - } - - // decompress and validate - // they may not be as catastrophic as they seem - // could be caused by changes to the blob as this method was running - header, payload, _, err := DecompressBlob(packingBuffer.Bytes(), bm.dictStore) - if err != nil { - logrus.WithError(err).Warn("optimizer failed to decompress blob", base64.StdEncoding.EncodeToString(packingBuffer.Bytes())) - continue - } - - if !header.Equals(&bm.header) { - logrus.Warn("optimizer header mismatch") - goto newData // assuming this is not actually a compression error, but caused by changes to the blob - } - - if !bytes.Equal(payload, bm.compressor.WrittenBytes()) { - logrus.Warn("optimizer payload mismatch") - goto newData - } - - bm.obtainLock() - - // blob has been modified since the optimizer started - if bm.modificationNum != modificationNum { - logrus.Warn("optimized data version obsolete") - bm.releaseLock() - goto newData - } - - // swap the compressors (our current one has the payload) - bm.compressor, compressor = compressor, bm.compressor - - // store the new blob - bm.currentBlobLength = int(n) - copy(bm.currentBlob[:n], packingBuffer.Bytes()) - - bm.releaseLock() - - bm.optimizerModificationNum = modificationNum - logrus.Info("optimizer success") - } - }() -} - -// obtainLock and releaseLock wrap bm.lock.Lock and bm.lock.Unlock respectively, in order to enable -// logging for debugging purposes. Normally (hopefully?) they would be inlined by the compiler. -func (bm *BlobMaker) obtainLock() { - bm.lock.Lock() -} - -func (bm *BlobMaker) releaseLock() { - bm.lock.Unlock() + return fmt.Errorf(format+": %w", append(args, err)...) }