Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mpeter/integrate offchain package pt1 #626

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 32 additions & 13 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,20 @@ import (
"github.com/onflow/flow-evm-gateway/services/traces"
"github.com/onflow/flow-evm-gateway/storage"
"github.com/onflow/flow-evm-gateway/storage/pebble"
"github.com/onflow/flow-go/fvm/evm"
evmTypes "github.com/onflow/flow-go/fvm/evm/types"

offchain "github.com/onflow/flow-go/fvm/evm/offchain/blocks"
)

type Storages struct {
Storage *pebble.Storage
Blocks storage.BlockIndexer
Transactions storage.TransactionIndexer
Receipts storage.ReceiptIndexer
Accounts storage.AccountIndexer
Traces storage.TraceIndexer
Storage *pebble.Storage
Blocks storage.BlockIndexer
BlockMetadata evmTypes.BackendStorage
Transactions storage.TransactionIndexer
Receipts storage.ReceiptIndexer
Accounts storage.AccountIndexer
Traces storage.TraceIndexer
}

type Publishers struct {
Expand Down Expand Up @@ -116,17 +121,29 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
Uint64("missed-heights", latestCadenceBlock.Height-latestCadenceHeight).
Msg("indexing cadence height information")

chainID := b.config.FlowNetworkID

// create event subscriber
subscriber := ingestion.NewRPCSubscriber(
b.client,
b.config.HeartbeatInterval,
b.config.FlowNetworkID,
chainID,
b.logger,
)

blockProvider, err := offchain.NewBasicProvider(
chainID,
b.storages.BlockMetadata,
evm.StorageAccountAddress(chainID),
)
if err != nil {
return fmt.Errorf("failed to create blocks BasicProvider")
}

// initialize event ingestion engine
b.events = ingestion.NewEventIngestionEngine(
subscriber,
blockProvider,
b.storages.Storage,
b.storages.Blocks,
b.storages.Receipts,
Expand All @@ -136,6 +153,7 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
b.publishers.Logs,
b.logger,
b.collector,
b.config,
)

StartEngine(ctx, b.events, l)
Expand Down Expand Up @@ -517,12 +535,13 @@ func setupStorage(
}

return &Storages{
Storage: store,
Blocks: blocks,
Transactions: pebble.NewTransactions(store),
Receipts: pebble.NewReceipts(store),
Accounts: pebble.NewAccounts(store),
Traces: pebble.NewTraces(store),
Storage: store,
Blocks: blocks,
BlockMetadata: pebble.NewBlockMetadata(store),
Transactions: pebble.NewTransactions(store),
Receipts: pebble.NewReceipts(store),
Accounts: pebble.NewAccounts(store),
Traces: pebble.NewTraces(store),
}, nil
}

Expand Down
6 changes: 3 additions & 3 deletions models/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func (b *Block) Hash() (gethCommon.Hash, error) {

// decodeBlockEvent takes a cadence event that contains executed block payload and
// decodes it into the Block type.
func decodeBlockEvent(event cadence.Event) (*Block, error) {
func decodeBlockEvent(event cadence.Event) (*Block, *events.BlockEventPayload, error) {
payload, err := events.DecodeBlockEventPayload(event)
if err != nil {
return nil, fmt.Errorf(
return nil, nil, fmt.Errorf(
"failed to Cadence-decode EVM block event [%s]: %w",
event.String(),
err,
Expand Down Expand Up @@ -102,7 +102,7 @@ func decodeBlockEvent(event cadence.Event) (*Block, error) {
PrevRandao: payload.PrevRandao,
},
FixedHash: fixedHash,
}, nil
}, payload, nil
}

// blockV0 is the block format, prior to adding the PrevRandao field.
Expand Down
4 changes: 2 additions & 2 deletions models/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func Test_DecodeBlockExecutedEvent(t *testing.T) {
encEv, err := ev.Payload.ToCadence(flowGo.Previewnet)
require.NoError(t, err)

decBlock, err := decodeBlockEvent(encEv)
decBlock, _, err := decodeBlockEvent(encEv)
require.NoError(t, err)

assert.Equal(t, decBlock, block)
Expand Down Expand Up @@ -150,7 +150,7 @@ func Test_DecodingLegacyBlockExecutedEvent(t *testing.T) {
hashToCadenceArrayValue(block.TransactionHashRoot),
}).WithType(eventType)

b, err := decodeBlockEvent(legacyEvent)
b, _, err := decodeBlockEvent(legacyEvent)
require.NoError(t, err)

require.Equal(t, block.ParentBlockHash, b.ParentBlockHash)
Expand Down
24 changes: 18 additions & 6 deletions models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,12 @@ func isTransactionExecutedEvent(event cadence.Event) bool {
// CadenceEvents contains Flow emitted events containing one or zero evm block executed event,
// and multiple or zero evm transaction events.
type CadenceEvents struct {
events flow.BlockEvents // Flow events for a specific flow block
block *Block // EVM block (at most one per Flow block)
transactions []Transaction // transactions in the EVM block
receipts []*Receipt // receipts for transactions
events flow.BlockEvents // Flow events for a specific flow block
block *Block // EVM block (at most one per Flow block)
transactions []Transaction // transactions in the EVM block
receipts []*Receipt // receipts for transactions
blockEventPayload *events.BlockEventPayload
txEventPayloads []events.TransactionEventPayload
}

// NewCadenceEvents decodes the events into evm types.
Expand Down Expand Up @@ -97,23 +99,25 @@ func decodeCadenceEvents(events flow.BlockEvents) (*CadenceEvents, error) {
return nil, fmt.Errorf("EVM block was already set for Flow block: %d", events.Height)
}

block, err := decodeBlockEvent(val)
block, blockEventPayload, err := decodeBlockEvent(val)
if err != nil {
return nil, err
}

e.block = block
e.blockEventPayload = blockEventPayload
continue
}

if isTransactionExecutedEvent(val) {
tx, receipt, err := decodeTransactionEvent(val)
tx, receipt, txEventPayload, err := decodeTransactionEvent(val)
if err != nil {
return nil, err
}

e.transactions = append(e.transactions, tx)
e.receipts = append(e.receipts, receipt)
e.txEventPayloads = append(e.txEventPayloads, *txEventPayload)
}
}

Expand Down Expand Up @@ -182,6 +186,14 @@ func (c *CadenceEvents) Length() int {
return len(c.events.Events)
}

func (c *CadenceEvents) BlockEventPayload() *events.BlockEventPayload {
return c.blockEventPayload
}

func (c *CadenceEvents) TxEventPayloads() []events.TransactionEventPayload {
return c.txEventPayloads
}

// BlockEvents is a wrapper around events streamed, and it also contains an error
type BlockEvents struct {
Events *CadenceEvents
Expand Down
2 changes: 1 addition & 1 deletion models/receipt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func Test_DecodeReceipts(t *testing.T) {
cdcEv, rec := createTestEvent(t, evmTxBinary)

_, receipt, err := decodeTransactionEvent(cdcEv)
_, receipt, _, err := decodeTransactionEvent(cdcEv)
require.NoError(t, err)

for i, l := range rec.Logs {
Expand Down
17 changes: 11 additions & 6 deletions models/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,15 @@ func (tc TransactionCall) MarshalBinary() ([]byte, error) {
// decodeTransactionEvent takes a cadence event for transaction executed
// and decodes its payload into a Transaction interface and a Receipt.
// The concrete type will be either a TransactionCall or a DirectCall.
func decodeTransactionEvent(event cadence.Event) (Transaction, *Receipt, error) {
func decodeTransactionEvent(event cadence.Event) (
Transaction,
*Receipt,
*events.TransactionEventPayload,
error,
) {
txEvent, err := events.DecodeTransactionEventPayload(event)
if err != nil {
return nil, nil, fmt.Errorf("failed to Cadence decode transaction event [%s]: %w", event.String(), err)
return nil, nil, nil, fmt.Errorf("failed to Cadence decode transaction event [%s]: %w", event.String(), err)
}

gethReceipt := &gethTypes.Receipt{
Expand All @@ -186,7 +191,7 @@ func decodeTransactionEvent(event cadence.Event) (Transaction, *Receipt, error)
if len(txEvent.Logs) > 0 {
err = rlp.Decode(bytes.NewReader(txEvent.Logs), &gethReceipt.Logs)
if err != nil {
return nil, nil, fmt.Errorf("failed to RLP-decode logs: %w", err)
return nil, nil, nil, fmt.Errorf("failed to RLP-decode logs: %w", err)
}
}

Expand All @@ -211,19 +216,19 @@ func decodeTransactionEvent(event cadence.Event) (Transaction, *Receipt, error)
if txEvent.TransactionType == types.DirectCallTxType {
directCall, err := types.DirectCallFromEncoded(txEvent.Payload)
if err != nil {
return nil, nil, fmt.Errorf("failed to RLP-decode direct call [%x]: %w", txEvent.Payload, err)
return nil, nil, nil, fmt.Errorf("failed to RLP-decode direct call [%x]: %w", txEvent.Payload, err)
}
tx = DirectCall{DirectCall: directCall}
} else {
gethTx := &gethTypes.Transaction{}
if err := gethTx.UnmarshalBinary(txEvent.Payload); err != nil {
return nil, nil, fmt.Errorf("failed to RLP-decode transaction [%x]: %w", txEvent.Payload, err)
return nil, nil, nil, fmt.Errorf("failed to RLP-decode transaction [%x]: %w", txEvent.Payload, err)
}
receipt.EffectiveGasPrice = gethTx.EffectiveGasTipValue(nil)
tx = TransactionCall{Transaction: gethTx}
}

return tx, receipt, nil
return tx, receipt, txEvent, nil
}

func UnmarshalTransaction(value []byte) (Transaction, error) {
Expand Down
8 changes: 4 additions & 4 deletions models/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func createTestEvent(t *testing.T, txBinary string) (cadence.Event, *types.Resul
func Test_DecodeEVMTransaction(t *testing.T) {
cdcEv, _ := createTestEvent(t, evmTxBinary)

decTx, _, err := decodeTransactionEvent(cdcEv)
decTx, _, _, err := decodeTransactionEvent(cdcEv)
require.NoError(t, err)
require.IsType(t, TransactionCall{}, decTx)

Expand Down Expand Up @@ -131,7 +131,7 @@ func Test_DecodeEVMTransaction(t *testing.T) {
func Test_DecodeDirectCall(t *testing.T) {
cdcEv, _ := createTestEvent(t, directCallBinary)

decTx, _, err := decodeTransactionEvent(cdcEv)
decTx, _, _, err := decodeTransactionEvent(cdcEv)
require.NoError(t, err)
require.IsType(t, DirectCall{}, decTx)

Expand Down Expand Up @@ -179,7 +179,7 @@ func Test_UnmarshalTransaction(t *testing.T) {

cdcEv, _ := createTestEvent(t, evmTxBinary)

tx, _, err := decodeTransactionEvent(cdcEv)
tx, _, _, err := decodeTransactionEvent(cdcEv)
require.NoError(t, err)

encodedTx, err := tx.MarshalBinary()
Expand Down Expand Up @@ -233,7 +233,7 @@ func Test_UnmarshalTransaction(t *testing.T) {

cdcEv, _ := createTestEvent(t, directCallBinary)

tx, _, err := decodeTransactionEvent(cdcEv)
tx, _, _, err := decodeTransactionEvent(cdcEv)
require.NoError(t, err)

encodedTx, err := tx.MarshalBinary()
Expand Down
43 changes: 43 additions & 0 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,15 @@ import (
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/rs/zerolog"

"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-evm-gateway/models"
"github.com/onflow/flow-evm-gateway/storage"
"github.com/onflow/flow-evm-gateway/storage/pebble"
"github.com/onflow/flow-go/fvm/evm"

offchain "github.com/onflow/flow-go/fvm/evm/offchain/blocks"
"github.com/onflow/flow-go/fvm/evm/offchain/sync"
)

var _ models.Engine = &Engine{}
Expand All @@ -35,6 +40,7 @@ type Engine struct {
*models.EngineStatus

subscriber EventSubscriber
blockProvider *offchain.BasicProvider
store *pebble.Storage
blocks storage.BlockIndexer
receipts storage.ReceiptIndexer
Expand All @@ -45,10 +51,12 @@ type Engine struct {
blocksPublisher *models.Publisher[*models.Block]
logsPublisher *models.Publisher[[]*gethTypes.Log]
collector metrics.Collector
config *config.Config
}

func NewEventIngestionEngine(
subscriber EventSubscriber,
blockProvider *offchain.BasicProvider,
store *pebble.Storage,
blocks storage.BlockIndexer,
receipts storage.ReceiptIndexer,
Expand All @@ -58,13 +66,15 @@ func NewEventIngestionEngine(
logsPublisher *models.Publisher[[]*gethTypes.Log],
log zerolog.Logger,
collector metrics.Collector,
config *config.Config,
) *Engine {
log = log.With().Str("component", "ingestion").Logger()

return &Engine{
EngineStatus: models.NewEngineStatus(),

subscriber: subscriber,
blockProvider: blockProvider,
store: store,
blocks: blocks,
receipts: receipts,
Expand All @@ -74,6 +84,7 @@ func NewEventIngestionEngine(
blocksPublisher: blocksPublisher,
logsPublisher: logsPublisher,
collector: collector,
config: config,
}
}

Expand Down Expand Up @@ -185,6 +196,38 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
return fmt.Errorf("failed to index receipts for block %d event: %w", events.Block().Height, err)
}

if e.blockProvider != nil {
err = e.blockProvider.OnBlockReceived(events.BlockEventPayload())
if err != nil {
return fmt.Errorf("failed to call OnBlockReceived: %w", err)
}

chainID := e.config.FlowNetworkID
rootAddr := evm.StorageAccountAddress(chainID)
storageProvider := pebble.NewRegister(
e.store,
events.Block().Height,
)
cr := sync.NewReplayer(
chainID,
rootAddr,
storageProvider,
e.blockProvider,
e.log,
nil,
true,
)
res, err := cr.ReplayBlock(events.TxEventPayloads(), events.BlockEventPayload())
if err != nil {
return fmt.Errorf("failed to replay block on height: %d, with: %w", events.Block().Height, err)
}

err = e.blockProvider.OnBlockExecuted(events.Block().Height, res)
if err != nil {
return fmt.Errorf("failed to call OnBlockExecuted: %w", err)
}
}

if err := batch.Commit(pebbleDB.Sync); err != nil {
return fmt.Errorf("failed to commit indexed data for Cadence block %d: %w", events.CadenceHeight(), err)
}
Expand Down
Loading
Loading