Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/l1 msg queue #1055

Draft
wants to merge 3 commits into
base: feat/l1-state-tracker
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions core/rawdb/accessors_l1_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,22 @@ func ReadFirstQueueIndexNotInL2Block(db ethdb.Reader, l2BlockHash common.Hash) *
queueIndex := binary.BigEndian.Uint64(data)
return &queueIndex
}

// WriteL1MsgStorageState writes the L1MsgStorage state
func WriteL1MsgStorageState(db ethdb.KeyValueWriter, state []byte) {
if err := db.Put(l1MsgStorageStateKey, state); err != nil {
log.Crit("Failed to update L1MsgStorage state", "err", err)
}
}

// ReadL1MsgStorageState retrieves the L1MsgStorage state
func ReadL1MsgStorageState(db ethdb.Reader) []byte {
data, err := db.Get(l1MsgStorageStateKey)
if err != nil && isNotFoundErr(err) {
return nil
}
if err != nil {
log.Crit("Failed to read highest synced L1 message queue index from database", "err", err)
}
return data
}
2 changes: 2 additions & 0 deletions core/rawdb/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ var (
firstQueueIndexNotInL2BlockPrefix = []byte("q") // firstQueueIndexNotInL2BlockPrefix + L2 block hash -> enqueue index
highestSyncedQueueIndexKey = []byte("HighestSyncedQueueIndex")

l1MsgStorageStateKey = []byte("L1MsgStorageState")

// Scroll rollup event store
rollupEventSyncedL1BlockNumberKey = []byte("R-LastRollupEventSyncedL1BlockNumber")
batchChunkRangesPrefix = []byte("R-bcr")
Expand Down
17 changes: 8 additions & 9 deletions rollup/da_syncer/da/calldata_blob_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"

"github.com/scroll-tech/go-ethereum/accounts/abi"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
"github.com/scroll-tech/go-ethereum/rollup/l1"
Expand Down Expand Up @@ -34,12 +33,12 @@ type CalldataBlobSource struct {
blobClient blob_client.BlobClient
l1height uint64
scrollChainABI *abi.ABI
db ethdb.Database
msgStorage *l1.MsgStorage

l1Finalized uint64
}

func NewCalldataBlobSource(ctx context.Context, l1height uint64, l1Reader *l1.Reader, blobClient blob_client.BlobClient, db ethdb.Database) (*CalldataBlobSource, error) {
func NewCalldataBlobSource(ctx context.Context, l1height uint64, l1Reader *l1.Reader, blobClient blob_client.BlobClient, msgStorage *l1.MsgStorage) (*CalldataBlobSource, error) {
scrollChainABI, err := l1.ScrollChainMetaData.GetAbi()
if err != nil {
return nil, fmt.Errorf("failed to get scroll chain abi: %w", err)
Expand All @@ -50,7 +49,7 @@ func NewCalldataBlobSource(ctx context.Context, l1height uint64, l1Reader *l1.Re
blobClient: blobClient,
l1height: l1height,
scrollChainABI: scrollChainABI,
db: db,
msgStorage: msgStorage,
}, nil
}

Expand Down Expand Up @@ -185,11 +184,11 @@ func (ds *CalldataBlobSource) getCommitBatchDA(batchIndex uint64, commitEvent *l
}
switch args.Version {
case 0:
return NewCommitBatchDAV0(ds.db, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap, commitEvent.BlockNumber())
return NewCommitBatchDAV0(ds.msgStorage, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap, commitEvent.BlockNumber())
case 1:
return NewCommitBatchDAV1(ds.ctx, ds.db, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
return NewCommitBatchDAV1(ds.ctx, ds.msgStorage, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
case 2:
return NewCommitBatchDAV2(ds.ctx, ds.db, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
return NewCommitBatchDAV2(ds.ctx, ds.msgStorage, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
default:
return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version)
}
Expand All @@ -201,9 +200,9 @@ func (ds *CalldataBlobSource) getCommitBatchDA(batchIndex uint64, commitEvent *l
switch args.Version {
case 3:
// we can use V2 for version 3, because it's same
return NewCommitBatchDAV2(ds.ctx, ds.db, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
return NewCommitBatchDAV2(ds.ctx, ds.msgStorage, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
case 4:
return NewCommitBatchDAV4(ds.ctx, ds.db, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
return NewCommitBatchDAV4(ds.ctx, ds.msgStorage, ds.l1Reader, ds.blobClient, commitEvent, args.Version, batchIndex, args.ParentBatchHeader, args.Chunks, args.SkippedL1MessageBitmap)
default:
return nil, fmt.Errorf("failed to decode DA, codec version is unknown: codec version: %d", args.Version)
}
Expand Down
15 changes: 7 additions & 8 deletions rollup/da_syncer/da/commitV0.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"github.com/scroll-tech/da-codec/encoding"
"github.com/scroll-tech/da-codec/encoding/codecv0"

"github.com/scroll-tech/go-ethereum/core/rawdb"
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/serrors"
"github.com/scroll-tech/go-ethereum/rollup/l1"
)

type CommitBatchDAV0 struct {
Expand All @@ -24,7 +23,7 @@ type CommitBatchDAV0 struct {
l1BlockNumber uint64
}

func NewCommitBatchDAV0(db ethdb.Database,
func NewCommitBatchDAV0(msgStorage *l1.MsgStorage,
version uint8,
batchIndex uint64,
parentBatchHeader []byte,
Expand All @@ -37,10 +36,10 @@ func NewCommitBatchDAV0(db ethdb.Database,
return nil, fmt.Errorf("failed to unpack chunks: %d, err: %w", batchIndex, err)
}

return NewCommitBatchDAV0WithChunks(db, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, l1BlockNumber)
return NewCommitBatchDAV0WithChunks(msgStorage, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, l1BlockNumber)
}

func NewCommitBatchDAV0WithChunks(db ethdb.Database,
func NewCommitBatchDAV0WithChunks(msgStorage *l1.MsgStorage,
version uint8,
batchIndex uint64,
parentBatchHeader []byte,
Expand All @@ -49,7 +48,7 @@ func NewCommitBatchDAV0WithChunks(db ethdb.Database,
l1BlockNumber uint64,
) (*CommitBatchDAV0, error) {
parentTotalL1MessagePopped := getBatchTotalL1MessagePopped(parentBatchHeader)
l1Txs, err := getL1Messages(db, parentTotalL1MessagePopped, skippedL1MessageBitmap, getTotalMessagesPoppedFromChunks(decodedChunks))
l1Txs, err := getL1Messages(msgStorage, parentTotalL1MessagePopped, skippedL1MessageBitmap, getTotalMessagesPoppedFromChunks(decodedChunks))
if err != nil {
return nil, fmt.Errorf("failed to get L1 messages for v0 batch %d: %w", batchIndex, err)
}
Expand Down Expand Up @@ -139,7 +138,7 @@ func getTotalMessagesPoppedFromChunks(decodedChunks []*codecv0.DAChunkRawTx) int
return totalL1MessagePopped
}

func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skippedBitmap []byte, totalL1MessagePopped int) ([]*types.L1MessageTx, error) {
func getL1Messages(msgStorage *l1.MsgStorage, parentTotalL1MessagePopped uint64, skippedBitmap []byte, totalL1MessagePopped int) ([]*types.L1MessageTx, error) {
var txs []*types.L1MessageTx

decodedSkippedBitmap, err := encoding.DecodeBitmap(skippedBitmap, totalL1MessagePopped)
Expand All @@ -154,7 +153,7 @@ func getL1Messages(db ethdb.Database, parentTotalL1MessagePopped uint64, skipped
currentIndex++
continue
}
l1Tx := rawdb.ReadL1Message(db, currentIndex)
l1Tx := msgStorage.ReadL1Message(currentIndex)
if l1Tx == nil {
// message not yet available
// we return serrors.EOFError as this will be handled in the syncing pipeline with a backoff and retry
Expand Down
9 changes: 4 additions & 5 deletions rollup/da_syncer/da/commitV1.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,13 @@ import (

"github.com/scroll-tech/go-ethereum/common"
"github.com/scroll-tech/go-ethereum/crypto/kzg4844"
"github.com/scroll-tech/go-ethereum/ethdb"
)

type CommitBatchDAV1 struct {
*CommitBatchDAV0
}

func NewCommitBatchDAV1(ctx context.Context, db ethdb.Database,
func NewCommitBatchDAV1(ctx context.Context, msgStorage *l1.MsgStorage,
l1Reader *l1.Reader,
blobClient blob_client.BlobClient,
commitEvent *l1.CommitBatchEvent,
Expand All @@ -30,10 +29,10 @@ func NewCommitBatchDAV1(ctx context.Context, db ethdb.Database,
chunks [][]byte,
skippedL1MessageBitmap []byte,
) (*CommitBatchDAV1, error) {
return NewCommitBatchDAV1WithBlobDecodeFunc(ctx, db, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv1.DecodeTxsFromBlob)
return NewCommitBatchDAV1WithBlobDecodeFunc(ctx, msgStorage, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv1.DecodeTxsFromBlob)
}

func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, db ethdb.Database,
func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, msgStorage *l1.MsgStorage,
l1Reader *l1.Reader,
blobClient blob_client.BlobClient,
commitEvent *l1.CommitBatchEvent,
Expand Down Expand Up @@ -82,7 +81,7 @@ func NewCommitBatchDAV1WithBlobDecodeFunc(ctx context.Context, db ethdb.Database
return nil, fmt.Errorf("failed to decode txs from blob: %w", err)
}

v0, err := NewCommitBatchDAV0WithChunks(db, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, commitEvent.BlockNumber())
v0, err := NewCommitBatchDAV0WithChunks(msgStorage, version, batchIndex, parentBatchHeader, decodedChunks, skippedL1MessageBitmap, commitEvent.BlockNumber())
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions rollup/da_syncer/da/commitV2.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/scroll-tech/da-codec/encoding/codecv2"

"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
"github.com/scroll-tech/go-ethereum/rollup/l1"
)
Expand All @@ -14,7 +13,7 @@ type CommitBatchDAV2 struct {
*CommitBatchDAV1
}

func NewCommitBatchDAV2(ctx context.Context, db ethdb.Database,
func NewCommitBatchDAV2(ctx context.Context, msgStorage *l1.MsgStorage,
l1Reader *l1.Reader,
blobClient blob_client.BlobClient,
commitEvent *l1.CommitBatchEvent,
Expand All @@ -25,7 +24,7 @@ func NewCommitBatchDAV2(ctx context.Context, db ethdb.Database,
skippedL1MessageBitmap []byte,
) (*CommitBatchDAV2, error) {

v1, err := NewCommitBatchDAV1WithBlobDecodeFunc(ctx, db, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv2.DecodeTxsFromBlob)
v1, err := NewCommitBatchDAV1WithBlobDecodeFunc(ctx, msgStorage, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv2.DecodeTxsFromBlob)
if err != nil {
return nil, err
}
Expand Down
5 changes: 2 additions & 3 deletions rollup/da_syncer/da/commitV4.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

"github.com/scroll-tech/da-codec/encoding/codecv4"

"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
"github.com/scroll-tech/go-ethereum/rollup/l1"
)
Expand All @@ -14,7 +13,7 @@ type CommitBatchDAV4 struct {
*CommitBatchDAV1
}

func NewCommitBatchDAV4(ctx context.Context, db ethdb.Database,
func NewCommitBatchDAV4(ctx context.Context, msgStorage *l1.MsgStorage,
l1Reader *l1.Reader,
blobClient blob_client.BlobClient,
commitEvent *l1.CommitBatchEvent,
Expand All @@ -25,7 +24,7 @@ func NewCommitBatchDAV4(ctx context.Context, db ethdb.Database,
skippedL1MessageBitmap []byte,
) (*CommitBatchDAV2, error) {

v1, err := NewCommitBatchDAV1WithBlobDecodeFunc(ctx, db, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv4.DecodeTxsFromBlob)
v1, err := NewCommitBatchDAV1WithBlobDecodeFunc(ctx, msgStorage, l1Reader, blobClient, commitEvent, version, batchIndex, parentBatchHeader, chunks, skippedL1MessageBitmap, codecv4.DecodeTxsFromBlob)
if err != nil {
return nil, err
}
Expand Down
8 changes: 3 additions & 5 deletions rollup/da_syncer/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"

"github.com/scroll-tech/go-ethereum/core"
"github.com/scroll-tech/go-ethereum/ethdb"
"github.com/scroll-tech/go-ethereum/params"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/blob_client"
"github.com/scroll-tech/go-ethereum/rollup/da_syncer/da"
Expand All @@ -22,22 +21,21 @@ type DataSourceFactory struct {
genesisConfig *params.ChainConfig
l1Reader *l1.Reader
blobClient blob_client.BlobClient
db ethdb.Database
msgStorage *l1.MsgStorage
}

func NewDataSourceFactory(blockchain *core.BlockChain, genesisConfig *params.ChainConfig, config Config, l1Reader *l1.Reader, blobClient blob_client.BlobClient, db ethdb.Database) *DataSourceFactory {
func NewDataSourceFactory(blockchain *core.BlockChain, genesisConfig *params.ChainConfig, config Config, l1Reader *l1.Reader, msgStorage *l1.MsgStorage, blobClient blob_client.BlobClient) *DataSourceFactory {
return &DataSourceFactory{
config: config,
genesisConfig: genesisConfig,
l1Reader: l1Reader,
blobClient: blobClient,
db: db,
}
}

func (ds *DataSourceFactory) OpenDataSource(ctx context.Context, l1height uint64) (DataSource, error) {
if ds.config.FetcherMode == L1RPC {
return da.NewCalldataBlobSource(ctx, l1height, ds.l1Reader, ds.blobClient, ds.db)
return da.NewCalldataBlobSource(ctx, l1height, ds.l1Reader, ds.blobClient, ds.msgStorage)
} else {
return nil, errors.New("snapshot_data_source: not implemented")
}
Expand Down
17 changes: 16 additions & 1 deletion rollup/da_syncer/syncing_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type SyncingPipeline struct {
expBackoff *backoff.Exponential

l1DeploymentBlock uint64
l1Tracker *l1.Tracker
msgStorage *l1.MsgStorage

db ethdb.Database
blockchain *core.BlockChain
Expand All @@ -51,6 +53,12 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi
if err != nil {
return nil, fmt.Errorf("failed to initialize l1.Reader, err = %w", err)
}
l1Tracker := l1.NewTracker(ctx, ethClient, blockchain.Genesis().Hash())

msgStorage, err := l1.NewMsgStorage(ctx, l1Tracker, l1Reader, l1.LatestChainHead)
if err != nil {
return nil, fmt.Errorf("failed to initialize msg storage, err = %w", err)
}

blobClientList := blob_client.NewBlobClientList()
if config.BeaconNodeAPIEndpoint != "" {
Expand All @@ -71,7 +79,7 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi
log.Crit("DA syncing is enabled but no blob client is configured. Please provide at least one blob client via command line flag.")
}

dataSourceFactory := NewDataSourceFactory(blockchain, genesisConfig, config, l1Reader, blobClientList, db)
dataSourceFactory := NewDataSourceFactory(blockchain, genesisConfig, config, l1Reader, msgStorage, blobClientList)
syncedL1Height := l1DeploymentBlock - 1
from := rawdb.ReadDASyncedL1BlockNumber(db)
if from != nil {
Expand All @@ -90,6 +98,8 @@ func NewSyncingPipeline(ctx context.Context, blockchain *core.BlockChain, genesi
expBackoff: backoff.NewExponential(100*time.Millisecond, 10*time.Second, 100*time.Millisecond),
wg: sync.WaitGroup{},
l1DeploymentBlock: l1DeploymentBlock,
l1Tracker: l1Tracker,
msgStorage: msgStorage,
db: db,
blockchain: blockchain,
blockQueue: blockQueue,
Expand All @@ -109,6 +119,9 @@ func (s *SyncingPipeline) Step() error {
func (s *SyncingPipeline) Start() {
log.Info("sync from DA: starting pipeline")

s.msgStorage.Start()
s.l1Tracker.Start()

s.wg.Add(1)
go func() {
s.mainLoop()
Expand Down Expand Up @@ -213,6 +226,8 @@ func (s *SyncingPipeline) mainLoop() {
func (s *SyncingPipeline) Stop() {
log.Info("sync from DA: stopping pipeline...")
s.cancel()
s.msgStorage.Stop()
s.l1Tracker.Stop()
s.wg.Wait()
log.Info("sync from DA: stopping pipeline... done")
}
Expand Down
Loading
Loading