From d163754b5e348baf86a20dcf793fa175cf49bf43 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 24 Oct 2024 13:37:44 +0300 Subject: [PATCH 1/4] Implement storage for Block metadata --- storage/pebble/block_metadata.go | 105 +++++++++++++++++++++++++++++++ storage/pebble/keys.go | 4 ++ 2 files changed, 109 insertions(+) create mode 100644 storage/pebble/block_metadata.go diff --git a/storage/pebble/block_metadata.go b/storage/pebble/block_metadata.go new file mode 100644 index 00000000..f189cd54 --- /dev/null +++ b/storage/pebble/block_metadata.go @@ -0,0 +1,105 @@ +package pebble + +import ( + "errors" + "fmt" + "sync" + + "github.com/onflow/atree" + + errs "github.com/onflow/flow-evm-gateway/models/errors" +) + +var _ atree.Ledger = &BlockMetadata{} + +type BlockMetadata struct { + store *Storage + mux sync.RWMutex +} + +func NewBlockMetadata(store *Storage) *BlockMetadata { + return &BlockMetadata{ + store: store, + mux: sync.RWMutex{}, + } +} + +func (l *BlockMetadata) GetValue(owner, key []byte) ([]byte, error) { + l.mux.RLock() + defer l.mux.RUnlock() + + id := append(owner, key...) + val, err := l.store.get(bmValue, id) + if err != nil { + return nil, fmt.Errorf( + "failed to get block metadata value at owner %x and key %x: %w", + owner, + key, + err, + ) + } + + return val, nil +} + +func (l *BlockMetadata) SetValue(owner, key, value []byte) error { + l.mux.Lock() + defer l.mux.Unlock() + + id := append(owner, key...) + if err := l.store.set(bmValue, id, value, nil); err != nil { + return fmt.Errorf( + "failed to store block metadata value for owner %x and key %x: %w", + owner, + key, + err, + ) + } + + return nil +} + +func (l *BlockMetadata) ValueExists(owner, key []byte) (bool, error) { + val, err := l.GetValue(owner, key) + if err != nil { + return false, err + } + + return val != nil, nil +} + +func (l *BlockMetadata) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error) { + l.mux.Lock() + defer l.mux.Unlock() + + var index atree.SlabIndex + + val, err := l.store.get(bmSlabIndex, owner) + if err != nil { + if !errors.Is(err, errs.ErrEntityNotFound) { + return atree.SlabIndexUndefined, err + } + } + + if val != nil { + if len(val) != len(index) { + return atree.SlabIndexUndefined, fmt.Errorf( + "slab index was not stored in correct format for owner %x", + owner, + ) + } + + copy(index[:], val) + } + + index.Next() + if err := l.store.set(bmSlabIndex, owner, index[:], nil); err != nil { + return atree.SlabIndexUndefined, fmt.Errorf( + "slab index failed to set for owner %x: %w", + owner, + err, + ) + } + + return index, nil +} diff --git a/storage/pebble/keys.go b/storage/pebble/keys.go index 77411c7a..bf5ebeb7 100644 --- a/storage/pebble/keys.go +++ b/storage/pebble/keys.go @@ -28,6 +28,10 @@ const ( ledgerValue = byte(50) ledgerSlabIndex = byte(51) + // block metadata keys + bmValue = byte(60) + bmSlabIndex = byte(61) + // special keys latestEVMHeightKey = byte(100) latestCadenceHeightKey = byte(102) From 07f2b2787cd6b409bb7bd55f427d146eec600762 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 24 Oct 2024 13:38:58 +0300 Subject: [PATCH 2/4] Gather block & transaction event payloads --- models/block.go | 6 +++--- models/block_test.go | 4 ++-- models/events.go | 16 ++++++++++------ models/receipt_test.go | 2 +- models/transaction.go | 17 +++++++++++------ models/transaction_test.go | 8 ++++---- 6 files changed, 31 insertions(+), 22 deletions(-) diff --git a/models/block.go b/models/block.go index 8ed43ea2..5f312f36 100644 --- a/models/block.go +++ b/models/block.go @@ -71,10 +71,10 @@ func (b *Block) Hash() (gethCommon.Hash, error) { // decodeBlockEvent takes a cadence event that contains executed block payload and // decodes it into the Block type. -func decodeBlockEvent(event cadence.Event) (*Block, error) { +func decodeBlockEvent(event cadence.Event) (*Block, *events.BlockEventPayload, error) { payload, err := events.DecodeBlockEventPayload(event) if err != nil { - return nil, fmt.Errorf( + return nil, nil, fmt.Errorf( "failed to Cadence-decode EVM block event [%s]: %w", event.String(), err, @@ -102,7 +102,7 @@ func decodeBlockEvent(event cadence.Event) (*Block, error) { PrevRandao: payload.PrevRandao, }, FixedHash: fixedHash, - }, nil + }, payload, nil } // blockV0 is the block format, prior to adding the PrevRandao field. diff --git a/models/block_test.go b/models/block_test.go index f8616798..0dfee6f5 100644 --- a/models/block_test.go +++ b/models/block_test.go @@ -112,7 +112,7 @@ func Test_DecodeBlockExecutedEvent(t *testing.T) { encEv, err := ev.Payload.ToCadence(flowGo.Previewnet) require.NoError(t, err) - decBlock, err := decodeBlockEvent(encEv) + decBlock, _, err := decodeBlockEvent(encEv) require.NoError(t, err) assert.Equal(t, decBlock, block) @@ -150,7 +150,7 @@ func Test_DecodingLegacyBlockExecutedEvent(t *testing.T) { hashToCadenceArrayValue(block.TransactionHashRoot), }).WithType(eventType) - b, err := decodeBlockEvent(legacyEvent) + b, _, err := decodeBlockEvent(legacyEvent) require.NoError(t, err) require.Equal(t, block.ParentBlockHash, b.ParentBlockHash) diff --git a/models/events.go b/models/events.go index 76cc92e0..6e0d491a 100644 --- a/models/events.go +++ b/models/events.go @@ -31,10 +31,12 @@ func isTransactionExecutedEvent(event cadence.Event) bool { // CadenceEvents contains Flow emitted events containing one or zero evm block executed event, // and multiple or zero evm transaction events. type CadenceEvents struct { - events flow.BlockEvents // Flow events for a specific flow block - block *Block // EVM block (at most one per Flow block) - transactions []Transaction // transactions in the EVM block - receipts []*Receipt // receipts for transactions + events flow.BlockEvents // Flow events for a specific flow block + block *Block // EVM block (at most one per Flow block) + transactions []Transaction // transactions in the EVM block + receipts []*Receipt // receipts for transactions + blockEventPayload *events.BlockEventPayload + txEventPayloads []events.TransactionEventPayload } // NewCadenceEvents decodes the events into evm types. @@ -97,23 +99,25 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) { return nil, fmt.Errorf("EVM block was already set for Flow block: %d", events.Height) } - block, err := decodeBlockEvent(val) + block, blockEventPayload, err := decodeBlockEvent(val) if err != nil { return nil, err } e.block = block + e.blockEventPayload = blockEventPayload continue } if isTransactionExecutedEvent(val) { - tx, receipt, err := decodeTransactionEvent(val) + tx, receipt, txEventPayload, err := decodeTransactionEvent(val) if err != nil { return nil, err } e.transactions = append(e.transactions, tx) e.receipts = append(e.receipts, receipt) + e.txEventPayloads = append(e.txEventPayloads, *txEventPayload) } } diff --git a/models/receipt_test.go b/models/receipt_test.go index 4bc2e0ab..adae146f 100644 --- a/models/receipt_test.go +++ b/models/receipt_test.go @@ -10,7 +10,7 @@ import ( func Test_DecodeReceipts(t *testing.T) { cdcEv, rec := createTestEvent(t, evmTxBinary) - _, receipt, err := decodeTransactionEvent(cdcEv) + _, receipt, _, err := decodeTransactionEvent(cdcEv) require.NoError(t, err) for i, l := range rec.Logs { diff --git a/models/transaction.go b/models/transaction.go index 8e19474e..b9e1a160 100644 --- a/models/transaction.go +++ b/models/transaction.go @@ -167,10 +167,15 @@ func (tc TransactionCall) MarshalBinary() ([]byte, error) { // decodeTransactionEvent takes a cadence event for transaction executed // and decodes its payload into a Transaction interface and a Receipt. // The concrete type will be either a TransactionCall or a DirectCall. -func decodeTransactionEvent(event cadence.Event) (Transaction, *Receipt, error) { +func decodeTransactionEvent(event cadence.Event) ( + Transaction, + *Receipt, + *events.TransactionEventPayload, + error, +) { txEvent, err := events.DecodeTransactionEventPayload(event) if err != nil { - return nil, nil, fmt.Errorf("failed to Cadence decode transaction event [%s]: %w", event.String(), err) + return nil, nil, nil, fmt.Errorf("failed to Cadence decode transaction event [%s]: %w", event.String(), err) } gethReceipt := &gethTypes.Receipt{ @@ -186,7 +191,7 @@ func decodeTransactionEvent(event cadence.Event) (Transaction, *Receipt, error) if len(txEvent.Logs) > 0 { err = rlp.Decode(bytes.NewReader(txEvent.Logs), &gethReceipt.Logs) if err != nil { - return nil, nil, fmt.Errorf("failed to RLP-decode logs: %w", err) + return nil, nil, nil, fmt.Errorf("failed to RLP-decode logs: %w", err) } } @@ -211,19 +216,19 @@ func decodeTransactionEvent(event cadence.Event) (Transaction, *Receipt, error) if txEvent.TransactionType == types.DirectCallTxType { directCall, err := types.DirectCallFromEncoded(txEvent.Payload) if err != nil { - return nil, nil, fmt.Errorf("failed to RLP-decode direct call [%x]: %w", txEvent.Payload, err) + return nil, nil, nil, fmt.Errorf("failed to RLP-decode direct call [%x]: %w", txEvent.Payload, err) } tx = DirectCall{DirectCall: directCall} } else { gethTx := &gethTypes.Transaction{} if err := gethTx.UnmarshalBinary(txEvent.Payload); err != nil { - return nil, nil, fmt.Errorf("failed to RLP-decode transaction [%x]: %w", txEvent.Payload, err) + return nil, nil, nil, fmt.Errorf("failed to RLP-decode transaction [%x]: %w", txEvent.Payload, err) } receipt.EffectiveGasPrice = gethTx.EffectiveGasTipValue(nil) tx = TransactionCall{Transaction: gethTx} } - return tx, receipt, nil + return tx, receipt, txEvent, nil } func UnmarshalTransaction(value []byte) (Transaction, error) { diff --git a/models/transaction_test.go b/models/transaction_test.go index 3e11e126..09e693f1 100644 --- a/models/transaction_test.go +++ b/models/transaction_test.go @@ -85,7 +85,7 @@ func createTestEvent(t *testing.T, txBinary string) (cadence.Event, *types.Resul func Test_DecodeEVMTransaction(t *testing.T) { cdcEv, _ := createTestEvent(t, evmTxBinary) - decTx, _, err := decodeTransactionEvent(cdcEv) + decTx, _, _, err := decodeTransactionEvent(cdcEv) require.NoError(t, err) require.IsType(t, TransactionCall{}, decTx) @@ -131,7 +131,7 @@ func Test_DecodeEVMTransaction(t *testing.T) { func Test_DecodeDirectCall(t *testing.T) { cdcEv, _ := createTestEvent(t, directCallBinary) - decTx, _, err := decodeTransactionEvent(cdcEv) + decTx, _, _, err := decodeTransactionEvent(cdcEv) require.NoError(t, err) require.IsType(t, DirectCall{}, decTx) @@ -179,7 +179,7 @@ func Test_UnmarshalTransaction(t *testing.T) { cdcEv, _ := createTestEvent(t, evmTxBinary) - tx, _, err := decodeTransactionEvent(cdcEv) + tx, _, _, err := decodeTransactionEvent(cdcEv) require.NoError(t, err) encodedTx, err := tx.MarshalBinary() @@ -233,7 +233,7 @@ func Test_UnmarshalTransaction(t *testing.T) { cdcEv, _ := createTestEvent(t, directCallBinary) - tx, _, err := decodeTransactionEvent(cdcEv) + tx, _, _, err := decodeTransactionEvent(cdcEv) require.NoError(t, err) encodedTx, err := tx.MarshalBinary() From 52cab4d97fb401c065d9fa436e8fce92b638cf6c Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 24 Oct 2024 15:22:35 +0300 Subject: [PATCH 3/4] Pass BasicProvider to ingestion Engine --- bootstrap/bootstrap.go | 44 ++++++++++++++++++++++--------- services/ingestion/engine.go | 5 ++++ services/ingestion/engine_test.go | 5 ++++ storage/pebble/block_metadata.go | 6 +++++ 4 files changed, 47 insertions(+), 13 deletions(-) diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index dbed6f48..a97bce36 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -25,15 +25,20 @@ import ( "github.com/onflow/flow-evm-gateway/services/traces" "github.com/onflow/flow-evm-gateway/storage" "github.com/onflow/flow-evm-gateway/storage/pebble" + "github.com/onflow/flow-go/fvm/evm" + evmTypes "github.com/onflow/flow-go/fvm/evm/types" + + offchain "github.com/onflow/flow-go/fvm/evm/offchain/blocks" ) type Storages struct { - Storage *pebble.Storage - Blocks storage.BlockIndexer - Transactions storage.TransactionIndexer - Receipts storage.ReceiptIndexer - Accounts storage.AccountIndexer - Traces storage.TraceIndexer + Storage *pebble.Storage + Blocks storage.BlockIndexer + BlockMetadata evmTypes.BackendStorage + Transactions storage.TransactionIndexer + Receipts storage.ReceiptIndexer + Accounts storage.AccountIndexer + Traces storage.TraceIndexer } type Publishers struct { @@ -116,17 +121,29 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error { Uint64("missed-heights", latestCadenceBlock.Height-latestCadenceHeight). Msg("indexing cadence height information") + chainID := b.config.FlowNetworkID + // create event subscriber subscriber := ingestion.NewRPCSubscriber( b.client, b.config.HeartbeatInterval, - b.config.FlowNetworkID, + chainID, b.logger, ) + blockProvider, err := offchain.NewBasicProvider( + chainID, + b.storages.BlockMetadata, + evm.StorageAccountAddress(chainID), + ) + if err != nil { + return fmt.Errorf("failed to create blocks BasicProvider") + } + // initialize event ingestion engine b.events = ingestion.NewEventIngestionEngine( subscriber, + blockProvider, b.storages.Storage, b.storages.Blocks, b.storages.Receipts, @@ -517,12 +534,13 @@ func setupStorage( } return &Storages{ - Storage: store, - Blocks: blocks, - Transactions: pebble.NewTransactions(store), - Receipts: pebble.NewReceipts(store), - Accounts: pebble.NewAccounts(store), - Traces: pebble.NewTraces(store), + Storage: store, + Blocks: blocks, + BlockMetadata: pebble.NewBlockMetadata(store), + Transactions: pebble.NewTransactions(store), + Receipts: pebble.NewReceipts(store), + Accounts: pebble.NewAccounts(store), + Traces: pebble.NewTraces(store), }, nil } diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 37aeae0b..69c7d411 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -13,6 +13,8 @@ import ( "github.com/onflow/flow-evm-gateway/models" "github.com/onflow/flow-evm-gateway/storage" "github.com/onflow/flow-evm-gateway/storage/pebble" + + offchain "github.com/onflow/flow-go/fvm/evm/offchain/blocks" ) var _ models.Engine = &Engine{} @@ -35,6 +37,7 @@ type Engine struct { *models.EngineStatus subscriber EventSubscriber + blockProvider *offchain.BasicProvider store *pebble.Storage blocks storage.BlockIndexer receipts storage.ReceiptIndexer @@ -49,6 +52,7 @@ type Engine struct { func NewEventIngestionEngine( subscriber EventSubscriber, + blockProvider *offchain.BasicProvider, store *pebble.Storage, blocks storage.BlockIndexer, receipts storage.ReceiptIndexer, @@ -65,6 +69,7 @@ func NewEventIngestionEngine( EngineStatus: models.NewEngineStatus(), subscriber: subscriber, + blockProvider: blockProvider, store: store, blocks: blocks, receipts: receipts, diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index c7f6a77b..6fe85654 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -63,6 +63,7 @@ func TestSerialBlockIngestion(t *testing.T) { engine := NewEventIngestionEngine( subscriber, + nil, store, blocks, receipts, @@ -143,6 +144,7 @@ func TestSerialBlockIngestion(t *testing.T) { engine := NewEventIngestionEngine( subscriber, + nil, store, blocks, receipts, @@ -258,6 +260,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { engine := NewEventIngestionEngine( subscriber, + nil, store, blocks, receipts, @@ -361,6 +364,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { engine := NewEventIngestionEngine( subscriber, + nil, store, blocks, receipts, @@ -457,6 +461,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { engine := NewEventIngestionEngine( subscriber, + nil, store, blocks, receipts, diff --git a/storage/pebble/block_metadata.go b/storage/pebble/block_metadata.go index f189cd54..2d867402 100644 --- a/storage/pebble/block_metadata.go +++ b/storage/pebble/block_metadata.go @@ -8,9 +8,11 @@ import ( "github.com/onflow/atree" errs "github.com/onflow/flow-evm-gateway/models/errors" + evmTypes "github.com/onflow/flow-go/fvm/evm/types" ) var _ atree.Ledger = &BlockMetadata{} +var _ evmTypes.BackendStorage = &BlockMetadata{} type BlockMetadata struct { store *Storage @@ -31,6 +33,10 @@ func (l *BlockMetadata) GetValue(owner, key []byte) ([]byte, error) { id := append(owner, key...) val, err := l.store.get(bmValue, id) if err != nil { + if errors.Is(err, errs.ErrEntityNotFound) { + return nil, nil + } + return nil, fmt.Errorf( "failed to get block metadata value at owner %x and key %x: %w", owner, From fada9d733511289c2b1b43b93d33fb42ebd7ae00 Mon Sep 17 00:00:00 2001 From: Ardit Marku Date: Thu, 24 Oct 2024 20:49:22 +0300 Subject: [PATCH 4/4] Replay blocks in event ingestion engine --- bootstrap/bootstrap.go | 1 + models/events.go | 8 ++ services/ingestion/engine.go | 38 +++++++ services/ingestion/engine_test.go | 5 + storage/pebble/register.go | 167 ++++++++++++++++++++++++++++++ 5 files changed, 219 insertions(+) create mode 100644 storage/pebble/register.go diff --git a/bootstrap/bootstrap.go b/bootstrap/bootstrap.go index a97bce36..83e29ff7 100644 --- a/bootstrap/bootstrap.go +++ b/bootstrap/bootstrap.go @@ -153,6 +153,7 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error { b.publishers.Logs, b.logger, b.collector, + b.config, ) StartEngine(ctx, b.events, l) diff --git a/models/events.go b/models/events.go index 6e0d491a..d801e4f1 100644 --- a/models/events.go +++ b/models/events.go @@ -186,6 +186,14 @@ func (c *CadenceEvents) Length() int { return len(c.events.Events) } +func (c *CadenceEvents) BlockEventPayload() *events.BlockEventPayload { + return c.blockEventPayload +} + +func (c *CadenceEvents) TxEventPayloads() []events.TransactionEventPayload { + return c.txEventPayloads +} + // BlockEvents is a wrapper around events streamed, and it also contains an error type BlockEvents struct { Events *CadenceEvents diff --git a/services/ingestion/engine.go b/services/ingestion/engine.go index 69c7d411..b07bf77f 100644 --- a/services/ingestion/engine.go +++ b/services/ingestion/engine.go @@ -9,12 +9,15 @@ import ( gethTypes "github.com/onflow/go-ethereum/core/types" "github.com/rs/zerolog" + "github.com/onflow/flow-evm-gateway/config" "github.com/onflow/flow-evm-gateway/metrics" "github.com/onflow/flow-evm-gateway/models" "github.com/onflow/flow-evm-gateway/storage" "github.com/onflow/flow-evm-gateway/storage/pebble" + "github.com/onflow/flow-go/fvm/evm" offchain "github.com/onflow/flow-go/fvm/evm/offchain/blocks" + "github.com/onflow/flow-go/fvm/evm/offchain/sync" ) var _ models.Engine = &Engine{} @@ -48,6 +51,7 @@ type Engine struct { blocksPublisher *models.Publisher[*models.Block] logsPublisher *models.Publisher[[]*gethTypes.Log] collector metrics.Collector + config *config.Config } func NewEventIngestionEngine( @@ -62,6 +66,7 @@ func NewEventIngestionEngine( logsPublisher *models.Publisher[[]*gethTypes.Log], log zerolog.Logger, collector metrics.Collector, + config *config.Config, ) *Engine { log = log.With().Str("component", "ingestion").Logger() @@ -79,6 +84,7 @@ func NewEventIngestionEngine( blocksPublisher: blocksPublisher, logsPublisher: logsPublisher, collector: collector, + config: config, } } @@ -190,6 +196,38 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error { return fmt.Errorf("failed to index receipts for block %d event: %w", events.Block().Height, err) } + if e.blockProvider != nil { + err = e.blockProvider.OnBlockReceived(events.BlockEventPayload()) + if err != nil { + return fmt.Errorf("failed to call OnBlockReceived: %w", err) + } + + chainID := e.config.FlowNetworkID + rootAddr := evm.StorageAccountAddress(chainID) + storageProvider := pebble.NewRegister( + e.store, + events.Block().Height, + ) + cr := sync.NewReplayer( + chainID, + rootAddr, + storageProvider, + e.blockProvider, + e.log, + nil, + true, + ) + res, err := cr.ReplayBlock(events.TxEventPayloads(), events.BlockEventPayload()) + if err != nil { + return fmt.Errorf("failed to replay block on height: %d, with: %w", events.Block().Height, err) + } + + err = e.blockProvider.OnBlockExecuted(events.Block().Height, res) + if err != nil { + return fmt.Errorf("failed to call OnBlockExecuted: %w", err) + } + } + if err := batch.Commit(pebbleDB.Sync); err != nil { return fmt.Errorf("failed to commit indexed data for Cadence block %d: %w", events.CadenceHeight(), err) } diff --git a/services/ingestion/engine_test.go b/services/ingestion/engine_test.go index 6fe85654..63aa0719 100644 --- a/services/ingestion/engine_test.go +++ b/services/ingestion/engine_test.go @@ -73,6 +73,7 @@ func TestSerialBlockIngestion(t *testing.T) { models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, + nil, ) done := make(chan struct{}) @@ -154,6 +155,7 @@ func TestSerialBlockIngestion(t *testing.T) { models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, + nil, ) waitErr := make(chan struct{}) @@ -270,6 +272,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, + nil, ) done := make(chan struct{}) @@ -374,6 +377,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, + nil, ) done := make(chan struct{}) @@ -471,6 +475,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) { models.NewPublisher[[]*gethTypes.Log](), zerolog.Nop(), metrics.NopCollector, + nil, ) done := make(chan struct{}) diff --git a/storage/pebble/register.go b/storage/pebble/register.go new file mode 100644 index 00000000..1608c712 --- /dev/null +++ b/storage/pebble/register.go @@ -0,0 +1,167 @@ +package pebble + +import ( + "errors" + "fmt" + "sync" + + "github.com/cockroachdb/pebble" + "github.com/onflow/atree" + + errs "github.com/onflow/flow-evm-gateway/models/errors" + + "github.com/onflow/flow-go/fvm/evm/types" +) + +var _ atree.Ledger = &Register{} +var _ types.StorageProvider = &Register{} + +type Register struct { + height uint64 + store *Storage + mux sync.RWMutex +} + +// NewRegister creates a new index instance at the provided height, all reads and +// writes of the registers will happen at that height. +func NewRegister(store *Storage, height uint64) *Register { + return &Register{ + store: store, + height: height, + mux: sync.RWMutex{}, + } +} + +func (l *Register) GetSnapshotAt(evmBlockHeight uint64) (types.BackendStorageSnapshot, error) { + return &Register{ + store: l.store, + height: evmBlockHeight - 1, + mux: sync.RWMutex{}, + }, nil +} + +func (l *Register) GetValue(owner, key []byte) ([]byte, error) { + l.mux.RLock() + defer l.mux.RUnlock() + + db := l.store.db + + iter, err := db.NewIter(&pebble.IterOptions{ + LowerBound: l.idLower(owner, key), + UpperBound: l.idUpper(owner, key), + }) + if err != nil { + return nil, fmt.Errorf("failed to create register range iterator: %w", err) + } + defer func() { + if err := iter.Close(); err != nil { + l.store.log.Error().Err(err).Msg("failed to close register iterator") + } + }() + + found := iter.Last() + if !found { + // as per interface expectation we need to return nil if not found + return nil, nil + } + + val, err := iter.ValueAndErr() + if err != nil { + return nil, fmt.Errorf( + "failed to get ledger value at owner %x and key %x: %w", + owner, + key, + err, + ) + } + + return val, nil +} + +func (l *Register) SetValue(owner, key, value []byte) error { + l.mux.Lock() + defer l.mux.Unlock() + + id := l.id(owner, key) + if err := l.store.set(ledgerValue, id, value, nil); err != nil { + return fmt.Errorf( + "failed to store ledger value for owner %x and key %x: %w", + owner, + key, + err, + ) + } + + return nil +} + +func (l *Register) ValueExists(owner, key []byte) (bool, error) { + val, err := l.GetValue(owner, key) + if err != nil { + return false, err + } + + return val != nil, nil +} + +func (l *Register) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error) { + l.mux.Lock() + defer l.mux.Unlock() + + var index atree.SlabIndex + + val, err := l.store.get(ledgerSlabIndex, owner) + if err != nil { + if !errors.Is(err, errs.ErrEntityNotFound) { + return atree.SlabIndexUndefined, err + } + } + + if val != nil { + if len(val) != len(index) { + return atree.SlabIndexUndefined, fmt.Errorf( + "slab index was not stored in correct format for owner %x", + owner, + ) + } + + copy(index[:], val) + } + + index = index.Next() + if err := l.store.set(ledgerSlabIndex, owner, index[:], nil); err != nil { + return atree.SlabIndexUndefined, fmt.Errorf( + "slab index failed to set for owner %x: %w", + owner, + err, + ) + } + + return index, nil +} + +// id calculates a ledger id with embedded block height for owner and key. +// The key for a register has the following schema: +// {owner}{key}{height} +func (l *Register) id(owner, key []byte) []byte { + id := append(owner, key...) + h := uint64Bytes(l.height) + return append(id, h...) +} + +func (l *Register) idUpper(owner, key []byte) []byte { + id := []byte{ledgerValue} + id = append(id, owner...) + id = append(id, key...) + // increase height +1 because upper bound is exclusive + h := uint64Bytes(l.height + 1) + return append(id, h...) +} + +func (l *Register) idLower(owner, key []byte) []byte { + id := []byte{ledgerValue} + id = append(id, owner...) + id = append(id, key...) + // lower height is always 0 + return append(id, uint64Bytes(0)...) +}