From ee8f5e87aaf688324fc2f71dcb7126a7af4c07b9 Mon Sep 17 00:00:00 2001 From: Nazarii Denha Date: Mon, 7 Oct 2024 09:52:52 +0200 Subject: [PATCH 1/2] implement L1MsgStorage --- core/rawdb/accessors_l1_message.go | 19 ++ core/rawdb/schema.go | 2 + rollup/l1/msg_storage.go | 262 ++++++++++++++++++++ rollup/l1/reader.go | 17 +- rollup/rollup_sync_service/l1client_test.go | 6 + rollup/sync_service/types.go | 1 + 6 files changed, 294 insertions(+), 13 deletions(-) create mode 100644 rollup/l1/msg_storage.go diff --git a/core/rawdb/accessors_l1_message.go b/core/rawdb/accessors_l1_message.go index 4ae27c7b8ff0..7fbd7f7cc5c6 100644 --- a/core/rawdb/accessors_l1_message.go +++ b/core/rawdb/accessors_l1_message.go @@ -275,3 +275,22 @@ func ReadFirstQueueIndexNotInL2Block(db ethdb.Reader, l2BlockHash common.Hash) * queueIndex := binary.BigEndian.Uint64(data) return &queueIndex } + +// WriteL1MsgStorageState writes the L1MsgStorage state +func WriteL1MsgStorageState(db ethdb.KeyValueWriter, state []byte) { + if err := db.Put(l1MsgStorageStateKey, state); err != nil { + log.Crit("Failed to update L1MsgStorage state", "err", err) + } +} + +// ReadL1MsgStorageState retrieves the L1MsgStorage state +func ReadL1MsgStorageState(db ethdb.Reader) []byte { + data, err := db.Get(l1MsgStorageStateKey) + if err != nil && isNotFoundErr(err) { + return nil + } + if err != nil { + log.Crit("Failed to read highest synced L1 message queue index from database", "err", err) + } + return data +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 19883114fd12..7573ec280112 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -147,6 +147,8 @@ var ( firstQueueIndexNotInL2BlockPrefix = []byte("q") // firstQueueIndexNotInL2BlockPrefix + L2 block hash -> enqueue index highestSyncedQueueIndexKey = []byte("HighestSyncedQueueIndex") + l1MsgStorageStateKey = []byte("L1MsgStorageState") + // Scroll rollup event store rollupEventSyncedL1BlockNumberKey = []byte("R-LastRollupEventSyncedL1BlockNumber") batchChunkRangesPrefix = []byte("R-bcr") diff --git a/rollup/l1/msg_storage.go b/rollup/l1/msg_storage.go new file mode 100644 index 000000000000..e3eb447804fe --- /dev/null +++ b/rollup/l1/msg_storage.go @@ -0,0 +1,262 @@ +package l1 + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/scroll-tech/go-ethereum/common" + "github.com/scroll-tech/go-ethereum/core/types" + "github.com/scroll-tech/go-ethereum/log" +) + +const ( + defaultFetchInterval = 5 * time.Second +) + +type MsgStorageState struct { + StartBlockHeader *types.Header + EndBlockHeader *types.Header +} + +type MsgStorage struct { + state MsgStorageState + + ctx context.Context + cancel context.CancelFunc + + msgs *common.ShrinkingMap[uint64, storedL1Message] + reader *Reader + unsubscribeTracker func() + newChainNotifications []newChainNotification + + msgsMu sync.RWMutex + notifsMu sync.Mutex +} + +func NewMsgStorage(ctx context.Context, tracker *Tracker, reader *Reader) (*MsgStorage, error) { + if tracker == nil || reader == nil { + return nil, fmt.Errorf("failed to create MsgStorage, reader or tracker is nil") + } + ctx, cancel := context.WithCancel(ctx) + msgStorage := &MsgStorage{ + ctx: ctx, + cancel: cancel, + msgs: common.NewShrinkingMap[uint64, storedL1Message](1000), + reader: reader, + } + msgStorage.unsubscribeTracker = tracker.Subscribe(LatestChainHead, func(old, new []*types.Header) { + msgStorage.notifsMu.Lock() + defer msgStorage.notifsMu.Unlock() + msgStorage.newChainNotifications = append(msgStorage.newChainNotifications, newChainNotification{old, new}) + }) + + msgStorage.Start() + return msgStorage, nil +} + +func (ms *MsgStorage) Start() { + log.Info("starting MsgStorage") + go func() { + fetchTicker := time.NewTicker(defaultFetchInterval) + defer fetchTicker.Stop() + + for { + select { + case <-ms.ctx.Done(): + return + default: + } + select { + case <-ms.ctx.Done(): + return + case <-fetchTicker.C: + if len(ms.newChainNotifications) != 0 { + err := ms.fetchMessages() + if err != nil { + log.Warn("MsgStorage: failed to fetch messages", "err", err) + } + } + } + + } + }() +} + +// ReadL1Message retrieves the L1 message corresponding to the enqueue index. +func (ms *MsgStorage) ReadL1Message(queueIndex uint64) *types.L1MessageTx { + ms.msgsMu.RLock() + defer ms.msgsMu.RUnlock() + msg, exists := ms.msgs.Get(queueIndex) + if !exists { + return nil + } + return msg.l1msg +} + +// IterateL1MessagesFrom creates an L1MessageIterator that iterates over +// all L1 message in the MsgStorage starting at the provided enqueue index. +func (ms *MsgStorage) IterateL1MessagesFrom(fromQueueIndex uint64) L1MessageIterator { + return L1MessageIterator{ + curIndex: fromQueueIndex, + msgStorage: ms, + } +} + +// ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`. +func (ms *MsgStorage) ReadL1MessagesFrom(startIndex, maxCount uint64) []types.L1MessageTx { + msgs := make([]types.L1MessageTx, 0, maxCount) + + index := startIndex + count := maxCount + + storedL1Msg, exists := ms.msgs.Get(index) + for count > 0 && exists { + msg := storedL1Msg.l1msg + + // sanity check + if msg.QueueIndex != index { + log.Crit( + "Unexpected QueueIndex in ReadL1MessagesFrom", + "expected", index, + "got", msg.QueueIndex, + "startIndex", startIndex, + "maxCount", maxCount, + ) + } + + msgs = append(msgs, *msg) + index += 1 + count -= 1 + storedL1Msg, exists = ms.msgs.Get(index) + } + + return msgs +} + +func (ms *MsgStorage) fetchMessages() error { + ms.notifsMu.Lock() + notifs := ms.newChainNotifications + ms.newChainNotifications = nil + ms.notifsMu.Unlock() + + // go through all chain notifications and process + for _, newChainNotification := range notifs { + old, new := newChainNotification.old, newChainNotification.new + + // check if there is old chain to delete l1msgs from + if old != nil { + // find msgs that come for reorged chain + ms.msgsMu.RLock() + msgs := ms.msgs.Values() + ms.msgsMu.RUnlock() + var indexesToDelete []uint64 + for _, msg := range msgs { + contains := false + for _, header := range old { + if header.Hash() == msg.headerHash { + contains = true + break + } + } + if contains { + indexesToDelete = append(indexesToDelete, msg.l1msg.QueueIndex) + } + } + if len(indexesToDelete) > 0 { + ms.msgsMu.Lock() + for _, index := range indexesToDelete { + ms.msgs.Delete(index) + } + ms.msgsMu.Unlock() + } + } + + // load messages from new chain + start := new[len(new)-1].Number.Uint64() + end := new[0].Number.Uint64() + events, err := ms.reader.FetchL1MessageEventsInRange(start, end) + if err != nil { + return fmt.Errorf("failed to fetch l1 messages in range, start: %d, end: %d, err: %w", start, end, err) + } + msgsToStore := make([]storedL1Message, len(events)) + for _, event := range events { + msg := &types.L1MessageTx{ + QueueIndex: event.QueueIndex, + Gas: event.GasLimit.Uint64(), + To: &event.Target, + Value: event.Value, + Data: event.Data, + Sender: event.Sender, + } + msgsToStore = append(msgsToStore, storedL1Message{ + l1msg: msg, + headerHash: event.Raw.BlockHash, + }) + } + ms.msgsMu.Lock() + for _, msg := range msgsToStore { + ms.msgs.Set(msg.l1msg.QueueIndex, msg) + } + ms.msgsMu.Unlock() + // update storage state + ms.state.EndBlockHeader = new[0] + if ms.state.StartBlockHeader == nil { + ms.state.StartBlockHeader = new[len(new)-1] + } + } + return nil +} + +// PruneMessages deletes all messages that are older or equal to provided index +func (ms *MsgStorage) PruneMessages(lastIndex uint64) { + ms.msgsMu.Lock() + defer ms.msgsMu.Unlock() + + // todo: update state for graceful restart + deleted := ms.msgs.Delete(lastIndex) + for deleted { + lastIndex-- + deleted = ms.msgs.Delete(lastIndex) + } +} + +func (ms *MsgStorage) Stop() { + log.Info("stopping MsgStorage") + ms.cancel() + log.Info("MsgStorage stopped") +} + +type storedL1Message struct { + l1msg *types.L1MessageTx + headerHash common.Hash +} + +type newChainNotification struct { + old []*types.Header + new []*types.Header +} + +type L1MessageIterator struct { + curIndex uint64 + curMsg *types.L1MessageTx + msgStorage *MsgStorage +} + +// Next moves the iterator to the next key/value pair. +// It returns false when there is no next L1Msg +func (it *L1MessageIterator) Next() bool { + it.curMsg = it.msgStorage.ReadL1Message(it.curIndex) + it.curIndex++ + if it.curMsg == nil { + return false + } else { + return true + } +} + +// L1Message returns the current L1 message. +func (it *L1MessageIterator) L1Message() types.L1MessageTx { + return *it.curMsg +} diff --git a/rollup/l1/reader.go b/rollup/l1/reader.go index dde19b8622a0..2523049c0e3c 100644 --- a/rollup/l1/reader.go +++ b/rollup/l1/reader.go @@ -144,9 +144,8 @@ func (r *Reader) FetchRollupEventsInRange(from, to uint64) (RollupEvents, error) return r.processLogsToRollupEvents(logs) } -func (r *Reader) FetchL1MessagesInRange(fromBlock, toBlock uint64) ([]types.L1MessageTx, error) { - var msgs []types.L1MessageTx - +func (r *Reader) FetchL1MessageEventsInRange(fromBlock, toBlock uint64) ([]*L1MessageQueueQueueTransaction, error) { + var events []*L1MessageQueueQueueTransaction err := r.queryInBatches(fromBlock, toBlock, defaultL1MsgFetchBlockRange, func(from, to uint64) error { it, err := r.filterer.FilterQueueTransaction(&bind.FilterOpts{ Start: from, @@ -163,22 +162,14 @@ func (r *Reader) FetchL1MessagesInRange(fromBlock, toBlock uint64) ([]types.L1Me if !event.GasLimit.IsUint64() { return fmt.Errorf("invalid QueueTransaction event: QueueIndex = %v, GasLimit = %v", event.QueueIndex, event.GasLimit) } - - msgs = append(msgs, types.L1MessageTx{ - QueueIndex: event.QueueIndex, - Gas: event.GasLimit.Uint64(), - To: &event.Target, - Value: event.Value, - Data: event.Data, - Sender: event.Sender, - }) + events = append(events, event) } return it.Error() }) if err != nil { return nil, err } - return msgs, nil + return events, nil } func (r *Reader) processLogsToRollupEvents(logs []types.Log) (RollupEvents, error) { diff --git a/rollup/rollup_sync_service/l1client_test.go b/rollup/rollup_sync_service/l1client_test.go index 38719d220f62..c6a92792cd38 100644 --- a/rollup/rollup_sync_service/l1client_test.go +++ b/rollup/rollup_sync_service/l1client_test.go @@ -57,6 +57,12 @@ func (m *mockEthClient) HeaderByNumber(ctx context.Context, number *big.Int) (*t }, nil } +func (m *mockEthClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + return &types.Header{ + Number: big.NewInt(100 - 64), + }, nil +} + func (m *mockEthClient) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { return nil, nil } diff --git a/rollup/sync_service/types.go b/rollup/sync_service/types.go index 3429ec1bb778..e838b039df85 100644 --- a/rollup/sync_service/types.go +++ b/rollup/sync_service/types.go @@ -16,6 +16,7 @@ type EthClient interface { ChainID(ctx context.Context) (*big.Int, error) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) + HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) TransactionByHash(ctx context.Context, txHash common.Hash) (tx *types.Transaction, isPending bool, err error) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) From f12698243da0d6c7397b50905a2762e688de9705 Mon Sep 17 00:00:00 2001 From: Nazarii Denha Date: Mon, 21 Oct 2024 08:56:46 +0200 Subject: [PATCH 2/2] msg storage usage, address comments --- rollup/da_syncer/da/calldata_blob_source.go | 17 +++--- rollup/da_syncer/da/commitV0.go | 15 +++-- rollup/da_syncer/da/commitV1.go | 9 ++- rollup/da_syncer/da/commitV2.go | 5 +- rollup/da_syncer/da/commitV4.go | 5 +- rollup/da_syncer/data_source.go | 8 +-- rollup/da_syncer/syncing_pipeline.go | 17 +++++- rollup/l1/msg_storage.go | 65 +++++++++++---------- rollup/l1/tracker.go | 4 +- rollup/l1/tracker_test.go | 2 +- rollup/l1/types.go | 4 +- 11 files changed, 82 insertions(+), 69 deletions(-) diff --git a/rollup/da_syncer/da/calldata_blob_source.go b/rollup/da_syncer/da/calldata_blob_source.go index 451e06b7d2a2..bc4b47f79c33 100644 --- a/rollup/da_syncer/da/calldata_blob_source.go +++ b/rollup/da_syncer/da/calldata_blob_source.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/scroll-tech/go-ethereum/accounts/abi" - "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" "github.com/scroll-tech/go-ethereum/rollup/l1" @@ -34,12 +33,12 @@ type CalldataBlobSource struct { blobClient blob_client.BlobClient l1height uint64 scrollChainABI *abi.ABI - db ethdb.Database + msgStorage *l1.MsgStorage l1Finalized uint64 } -func NewCalldataBlobSource(ctx context.Context, l1height uint64, l1Reader *l1.Reader, blobClient blob_client.BlobClient, db ethdb.Database) (*CalldataBlobSource, error) { +func NewCalldataBlobSource(ctx context.Context, l1height uint64, l1Reader *l1.Reader, blobClient blob_client.BlobClient, msgStorage *l1.MsgStorage) (*CalldataBlobSource, error) { scrollChainABI, err := l1.ScrollChainMetaData.GetAbi() if err != nil { return nil, fmt.Errorf("failed to get scroll chain abi: %w", err) @@ -50,7 +49,7 @@ func NewCalldataBlobSource(ctx context.Context, l1height uint64, l1Reader *l1.Re blobClient: blobClient, l1height: l1height, scrollChainABI: scrollChainABI, - db: db, + msgStorage: msgStorage, }, nil } @@ -185,11 +184,11 @@ func (ds *CalldataBlobSource) getCommitBatchDA(batchIndex uint64, commitEvent *l } switch args.Version { case 0: - return NewCommitBatchDAV0(ds.db, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap, commitEvent.BlockNumber()) + return NewCommitBatchDAV0(ds.msgStorage, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap, commitEvent.BlockNumber()) case 1: - return NewCommitBatchDAV1(ds.ctx, ds.db, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) + return NewCommitBatchDAV1(ds.ctx, ds.msgStorage, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) case 2: - return NewCommitBatchDAV2(ds.ctx, ds.db, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) + return NewCommitBatchDAV2(ds.ctx, ds.msgStorage, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) default: return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version) } @@ -201,9 +200,9 @@ func (ds *CalldataBlobSource) getCommitBatchDA(batchIndex uint64, commitEvent *l switch args.Version { case 3: // we can use V2 for version 3, because it's same - return NewCommitBatchDAV2(ds.ctx, ds.db, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) + return NewCommitBatchDAV2(ds.ctx, ds.msgStorage, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) case 4: - return NewCommitBatchDAV4(ds.ctx, ds.db, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) + return NewCommitBatchDAV4(ds.ctx, ds.msgStorage, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap) default: return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version) } diff --git a/rollup/da_syncer/da/commitV0.go b/rollup/da_syncer/da/commitV0.go index 66a13786c9cb..bd8d30207350 100644 --- a/rollup/da_syncer/da/commitV0.go +++ b/rollup/da_syncer/da/commitV0.go @@ -7,10 +7,9 @@ import ( "github.com/scroll-tech/da-codec/encoding" "github.com/scroll-tech/da-codec/encoding/codecv0" - "github.com/scroll-tech/go-ethereum/core/rawdb" "github.com/scroll-tech/go-ethereum/core/types" - "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors" + "github.com/scroll-tech/go-ethereum/rollup/l1" ) type CommitBatchDAV0 struct { @@ -24,7 +23,7 @@ type CommitBatchDAV0 struct { l1BlockNumber uint64 } -func NewCommitBatchDAV0(db ethdb.Database, +func NewCommitBatchDAV0(msgStorage *l1.MsgStorage, version uint8, batchIndex uint64, parentBatchHeader []byte, @@ -37,10 +36,10 @@ func NewCommitBatchDAV0(db ethdb.Database, return nil, fmt.Errorf("failed to unpack chunks: %d, err: %w", batchIndex, err) } - return NewCommitBatchDAV0WithChunks(db, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, l1BlockNumber) + return NewCommitBatchDAV0WithChunks(msgStorage, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, l1BlockNumber) } -func NewCommitBatchDAV0WithChunks(db ethdb.Database, +func NewCommitBatchDAV0WithChunks(msgStorage *l1.MsgStorage, version uint8, batchIndex uint64, parentBatchHeader []byte, @@ -49,7 +48,7 @@ func NewCommitBatchDAV0WithChunks(db ethdb.Database, l1BlockNumber uint64, ) (*CommitBatchDAV0, error) { parentTotalL1MessagePopped := getBatchTotalL1MessagePopped(parentBatchHeader) - l1Txs, err := getL1Messages(db, parentTotalL1MessagePopped, skippedL1MessageBitmap, getTotalMessagesPoppedFromChunks(decodedChunks)) + l1Txs, err := getL1Messages(msgStorage, parentTotalL1MessagePopped, skippedL1MessageBitmap, getTotalMessagesPoppedFromChunks(decodedChunks)) if err != nil { return nil, fmt.Errorf("failed to get L1 messages for v0 batch %d: %w", batchIndex, err) } @@ -139,7 +138,7 @@ func getTotalMessagesPoppedFromChunks(decodedChunks []*codecv0.DAChunkRawTx) int return totalL1MessagePopped } -func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skippedBitmap []byte, totalL1MessagePopped int) ([]*types.L1MessageTx, error) { +func getL1Messages(msgStorage *l1.MsgStorage, parentTotalL1MessagePopped uint64, skippedBitmap []byte, totalL1MessagePopped int) ([]*types.L1MessageTx, error) { var txs []*types.L1MessageTx decodedSkippedBitmap, err := encoding.DecodeBitmap(skippedBitmap, totalL1MessagePopped) @@ -154,7 +153,7 @@ func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skipped currentIndex++ continue } - l1Tx := rawdb.ReadL1Message(db, currentIndex) + l1Tx := msgStorage.ReadL1Message(currentIndex) if l1Tx == nil { // message not yet available // we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry diff --git a/rollup/da_syncer/da/commitV1.go b/rollup/da_syncer/da/commitV1.go index 31ad2a668084..5109b310b503 100644 --- a/rollup/da_syncer/da/commitV1.go +++ b/rollup/da_syncer/da/commitV1.go @@ -13,14 +13,13 @@ import ( "github.com/scroll-tech/go-ethereum/common" "github.com/scroll-tech/go-ethereum/crypto/kzg4844" - "github.com/scroll-tech/go-ethereum/ethdb" ) type CommitBatchDAV1 struct { *CommitBatchDAV0 } -func NewCommitBatchDAV1(ctx context.Context, db ethdb.Database, +func NewCommitBatchDAV1(ctx context.Context, msgStorage *l1.MsgStorage, l1Reader *l1.Reader, blobClient blob_client.BlobClient, commitEvent *l1.CommitBatchEvent, @@ -30,10 +29,10 @@ func NewCommitBatchDAV1(ctx context.Context, db ethdb.Database, chunks [][]byte, skippedL1MessageBitmap []byte, ) (*CommitBatchDAV1, error) { - return NewCommitBatchDAV1WithBlobDecodeFunc(ctx, db, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv1.DecodeTxsFromBlob) + return NewCommitBatchDAV1WithBlobDecodeFunc(ctx, msgStorage, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv1.DecodeTxsFromBlob) } -func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, db ethdb.Database, +func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, msgStorage *l1.MsgStorage, l1Reader *l1.Reader, blobClient blob_client.BlobClient, commitEvent *l1.CommitBatchEvent, @@ -82,7 +81,7 @@ func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, db ethdb.Database return nil, fmt.Errorf("failed to decode txs from blob: %w", err) } - v0, err := NewCommitBatchDAV0WithChunks(db, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, commitEvent.BlockNumber()) + v0, err := NewCommitBatchDAV0WithChunks(msgStorage, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, commitEvent.BlockNumber()) if err != nil { return nil, err } diff --git a/rollup/da_syncer/da/commitV2.go b/rollup/da_syncer/da/commitV2.go index 56110ff739bc..6231cf2e57f0 100644 --- a/rollup/da_syncer/da/commitV2.go +++ b/rollup/da_syncer/da/commitV2.go @@ -5,7 +5,6 @@ import ( "github.com/scroll-tech/da-codec/encoding/codecv2" - "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" "github.com/scroll-tech/go-ethereum/rollup/l1" ) @@ -14,7 +13,7 @@ type CommitBatchDAV2 struct { *CommitBatchDAV1 } -func NewCommitBatchDAV2(ctx context.Context, db ethdb.Database, +func NewCommitBatchDAV2(ctx context.Context, msgStorage *l1.MsgStorage, l1Reader *l1.Reader, blobClient blob_client.BlobClient, commitEvent *l1.CommitBatchEvent, @@ -25,7 +24,7 @@ func NewCommitBatchDAV2(ctx context.Context, db ethdb.Database, skippedL1MessageBitmap []byte, ) (*CommitBatchDAV2, error) { - v1, err := NewCommitBatchDAV1WithBlobDecodeFunc(ctx, db, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv2.DecodeTxsFromBlob) + v1, err := NewCommitBatchDAV1WithBlobDecodeFunc(ctx, msgStorage, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv2.DecodeTxsFromBlob) if err != nil { return nil, err } diff --git a/rollup/da_syncer/da/commitV4.go b/rollup/da_syncer/da/commitV4.go index 3d3366386756..324c9eb4a87d 100644 --- a/rollup/da_syncer/da/commitV4.go +++ b/rollup/da_syncer/da/commitV4.go @@ -5,7 +5,6 @@ import ( "github.com/scroll-tech/da-codec/encoding/codecv4" - "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" "github.com/scroll-tech/go-ethereum/rollup/l1" ) @@ -14,7 +13,7 @@ type CommitBatchDAV4 struct { *CommitBatchDAV1 } -func NewCommitBatchDAV4(ctx context.Context, db ethdb.Database, +func NewCommitBatchDAV4(ctx context.Context, msgStorage *l1.MsgStorage, l1Reader *l1.Reader, blobClient blob_client.BlobClient, commitEvent *l1.CommitBatchEvent, @@ -25,7 +24,7 @@ func NewCommitBatchDAV4(ctx context.Context, db ethdb.Database, skippedL1MessageBitmap []byte, ) (*CommitBatchDAV2, error) { - v1, err := NewCommitBatchDAV1WithBlobDecodeFunc(ctx, db, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv4.DecodeTxsFromBlob) + v1, err := NewCommitBatchDAV1WithBlobDecodeFunc(ctx, msgStorage, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv4.DecodeTxsFromBlob) if err != nil { return nil, err } diff --git a/rollup/da_syncer/data_source.go b/rollup/da_syncer/data_source.go index 582b72037c32..f456d55d096c 100644 --- a/rollup/da_syncer/data_source.go +++ b/rollup/da_syncer/data_source.go @@ -5,7 +5,6 @@ import ( "errors" "github.com/scroll-tech/go-ethereum/core" - "github.com/scroll-tech/go-ethereum/ethdb" "github.com/scroll-tech/go-ethereum/params" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client" "github.com/scroll-tech/go-ethereum/rollup/da_syncer/da" @@ -22,22 +21,21 @@ type DataSourceFactory struct { genesisConfig *params.ChainConfig l1Reader *l1.Reader blobClient blob_client.BlobClient - db ethdb.Database + msgStorage *l1.MsgStorage } -func NewDataSourceFactory(blockchain *core.BlockChain, genesisConfig *params.ChainConfig, config Config, l1Reader *l1.Reader, blobClient blob_client.BlobClient, db ethdb.Database) *DataSourceFactory { +func NewDataSourceFactory(blockchain *core.BlockChain, genesisConfig *params.ChainConfig, config Config, l1Reader *l1.Reader, msgStorage *l1.MsgStorage, blobClient blob_client.BlobClient) *DataSourceFactory { return &DataSourceFactory{ config: config, genesisConfig: genesisConfig, l1Reader: l1Reader, blobClient: blobClient, - db: db, } } func (ds *DataSourceFactory) OpenDataSource(ctx context.Context, l1height uint64) (DataSource, error) { if ds.config.FetcherMode == L1RPC { - return da.NewCalldataBlobSource(ctx, l1height, ds.l1Reader, ds.blobClient, ds.db) + return da.NewCalldataBlobSource(ctx, l1height, ds.l1Reader, ds.blobClient, ds.msgStorage) } else { return nil, errors.New("snapshot_data_source: not implemented") } diff --git a/rollup/da_syncer/syncing_pipeline.go b/rollup/da_syncer/syncing_pipeline.go index c27ee6733a9d..6fb25d6d80aa 100644 --- a/rollup/da_syncer/syncing_pipeline.go +++ b/rollup/da_syncer/syncing_pipeline.go @@ -36,6 +36,8 @@ type SyncingPipeline struct { expBackoff *backoff.Exponential l1DeploymentBlock uint64 + l1Tracker *l1.Tracker + msgStorage *l1.MsgStorage db ethdb.Database blockchain *core.BlockChain @@ -51,6 +53,12 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi if err != nil { return nil, fmt.Errorf("failed to initialize l1.Reader, err = %w", err) } + l1Tracker := l1.NewTracker(ctx, ethClient, blockchain.Genesis().Hash()) + + msgStorage, err := l1.NewMsgStorage(ctx, l1Tracker, l1Reader, l1.LatestChainHead) + if err != nil { + return nil, fmt.Errorf("failed to initialize msg storage, err = %w", err) + } blobClientList := blob_client.NewBlobClientList() if config.BeaconNodeAPIEndpoint != "" { @@ -71,7 +79,7 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi log.Crit("DA syncing is enabled but no blob client is configured. Please provide at least one blob client via command line flag.") } - dataSourceFactory := NewDataSourceFactory(blockchain, genesisConfig, config, l1Reader, blobClientList, db) + dataSourceFactory := NewDataSourceFactory(blockchain, genesisConfig, config, l1Reader, msgStorage, blobClientList) syncedL1Height := l1DeploymentBlock - 1 from := rawdb.ReadDASyncedL1BlockNumber(db) if from != nil { @@ -90,6 +98,8 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi expBackoff: backoff.NewExponential(100*time.Millisecond, 10*time.Second, 100*time.Millisecond), wg: sync.WaitGroup{}, l1DeploymentBlock: l1DeploymentBlock, + l1Tracker: l1Tracker, + msgStorage: msgStorage, db: db, blockchain: blockchain, blockQueue: blockQueue, @@ -109,6 +119,9 @@ func (s *SyncingPipeline) Step() error { func (s *SyncingPipeline) Start() { log.Info("sync from DA: starting pipeline") + s.msgStorage.Start() + s.l1Tracker.Start() + s.wg.Add(1) go func() { s.mainLoop() @@ -213,6 +226,8 @@ func (s *SyncingPipeline) mainLoop() { func (s *SyncingPipeline) Stop() { log.Info("sync from DA: stopping pipeline...") s.cancel() + s.msgStorage.Stop() + s.l1Tracker.Stop() s.wg.Wait() log.Info("sync from DA: stopping pipeline... done") } diff --git a/rollup/l1/msg_storage.go b/rollup/l1/msg_storage.go index e3eb447804fe..b8b267563eb9 100644 --- a/rollup/l1/msg_storage.go +++ b/rollup/l1/msg_storage.go @@ -28,36 +28,36 @@ type MsgStorage struct { msgs *common.ShrinkingMap[uint64, storedL1Message] reader *Reader + tracker *Tracker + confirmationRule ConfirmationRule unsubscribeTracker func() - newChainNotifications []newChainNotification + newChainNotifications chan newChainNotification - msgsMu sync.RWMutex - notifsMu sync.Mutex + msgsMu sync.RWMutex } -func NewMsgStorage(ctx context.Context, tracker *Tracker, reader *Reader) (*MsgStorage, error) { +func NewMsgStorage(ctx context.Context, tracker *Tracker, reader *Reader, confirmationRule ConfirmationRule) (*MsgStorage, error) { if tracker == nil || reader == nil { return nil, fmt.Errorf("failed to create MsgStorage, reader or tracker is nil") } ctx, cancel := context.WithCancel(ctx) msgStorage := &MsgStorage{ - ctx: ctx, - cancel: cancel, - msgs: common.NewShrinkingMap[uint64, storedL1Message](1000), - reader: reader, + ctx: ctx, + cancel: cancel, + msgs: common.NewShrinkingMap[uint64, storedL1Message](1000), + reader: reader, + tracker: tracker, + confirmationRule: confirmationRule, + newChainNotifications: make(chan newChainNotification, 10), } - msgStorage.unsubscribeTracker = tracker.Subscribe(LatestChainHead, func(old, new []*types.Header) { - msgStorage.notifsMu.Lock() - defer msgStorage.notifsMu.Unlock() - msgStorage.newChainNotifications = append(msgStorage.newChainNotifications, newChainNotification{old, new}) - }) - - msgStorage.Start() return msgStorage, nil } func (ms *MsgStorage) Start() { log.Info("starting MsgStorage") + ms.unsubscribeTracker = ms.tracker.Subscribe(ms.confirmationRule, func(old, new []*types.Header) { + ms.newChainNotifications <- newChainNotification{old, new} + }, 64) go func() { fetchTicker := time.NewTicker(defaultFetchInterval) defer fetchTicker.Stop() @@ -106,16 +106,18 @@ func (ms *MsgStorage) IterateL1MessagesFrom(fromQueueIndex uint64) L1MessageIter // ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`. func (ms *MsgStorage) ReadL1MessagesFrom(startIndex, maxCount uint64) []types.L1MessageTx { + if maxCount == 0 { + return []types.L1MessageTx{} + } msgs := make([]types.L1MessageTx, 0, maxCount) - index := startIndex - count := maxCount - - storedL1Msg, exists := ms.msgs.Get(index) - for count > 0 && exists { + for index := startIndex; len(msgs) < int(maxCount); index++ { + storedL1Msg, exists := ms.msgs.Get(index) + if !exists { + break // No more messages to read + } msg := storedL1Msg.l1msg - - // sanity check + // Sanity check for QueueIndex if msg.QueueIndex != index { log.Crit( "Unexpected QueueIndex in ReadL1MessagesFrom", @@ -125,21 +127,22 @@ func (ms *MsgStorage) ReadL1MessagesFrom(startIndex, maxCount uint64) []types.L1 "maxCount", maxCount, ) } - msgs = append(msgs, *msg) - index += 1 - count -= 1 - storedL1Msg, exists = ms.msgs.Get(index) } - return msgs } func (ms *MsgStorage) fetchMessages() error { - ms.notifsMu.Lock() - notifs := ms.newChainNotifications - ms.newChainNotifications = nil - ms.notifsMu.Unlock() + var notifs []newChainNotification +out: + for { + select { + case msg := <-ms.newChainNotifications: + notifs = append(notifs, msg) + default: + break out + } + } // go through all chain notifications and process for _, newChainNotification := range notifs { diff --git a/rollup/l1/tracker.go b/rollup/l1/tracker.go index 73633f810cb7..b6b030d7e295 100644 --- a/rollup/l1/tracker.go +++ b/rollup/l1/tracker.go @@ -408,7 +408,7 @@ func (t *Tracker) chain(start, end *types.Header, includeStart bool) []*types.He return chain } -func (t *Tracker) Subscribe(confirmationRule ConfirmationRule, callback SubscriptionCallback) (unsubscribe func()) { +func (t *Tracker) Subscribe(confirmationRule ConfirmationRule, callback SubscriptionCallback, maxHeadersSent int) (unsubscribe func()) { t.mu.Lock() defer t.mu.Unlock() @@ -425,7 +425,7 @@ func (t *Tracker) Subscribe(confirmationRule ConfirmationRule, callback Subscrip panic(fmt.Sprintf("invalid confirmation rule %d", confirmationRule)) } - sub := newSubscription(t.subscriptionCounter, confirmationRule, callback) + sub := newSubscription(t.subscriptionCounter, confirmationRule, callback, maxHeadersSent) subscriptionsByType := t.subscriptions[confirmationType] subscriptionsByType = append(subscriptionsByType, sub) diff --git a/rollup/l1/tracker_test.go b/rollup/l1/tracker_test.go index 954a625f4c2b..11e79bc2b1b2 100644 --- a/rollup/l1/tracker_test.go +++ b/rollup/l1/tracker_test.go @@ -171,7 +171,7 @@ func newSubscriptionCalls(tracker *Tracker, alias string, rule ConfirmationRule) unsubscribe := tracker.Subscribe(rule, func(old, new []*types.Header) { s.addActual(old, new) - }) + }, 0) return s, unsubscribe } diff --git a/rollup/l1/types.go b/rollup/l1/types.go index ed4535d42cdc..998078eb3d6a 100644 --- a/rollup/l1/types.go +++ b/rollup/l1/types.go @@ -47,13 +47,15 @@ type subscription struct { confirmationRule ConfirmationRule callback SubscriptionCallback lastSentHeader *types.Header + maxHeadersSent int // number of headers that could be sent at the time } -func newSubscription(id int, confirmationRule ConfirmationRule, callback SubscriptionCallback) *subscription { +func newSubscription(id int, confirmationRule ConfirmationRule, callback SubscriptionCallback, maxHeadersSent int) *subscription { return &subscription{ id: id, confirmationRule: confirmationRule, callback: callback, + maxHeadersSent: maxHeadersSent, } }