Skip to content

Commit

Permalink
Replay blocks in event ingestion engine
Browse files Browse the repository at this point in the history
  • Loading branch information
m-Peter committed Oct 24, 2024
1 parent 52cab4d commit fada9d7
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 0 deletions.
1 change: 1 addition & 0 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
b.publishers.Logs,
b.logger,
b.collector,
b.config,
)

StartEngine(ctx, b.events, l)
Expand Down
8 changes: 8 additions & 0 deletions models/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,14 @@ func (c *CadenceEvents) Length() int {
return len(c.events.Events)
}

func (c *CadenceEvents) BlockEventPayload() *events.BlockEventPayload {
return c.blockEventPayload
}

func (c *CadenceEvents) TxEventPayloads() []events.TransactionEventPayload {
return c.txEventPayloads
}

// BlockEvents is a wrapper around events streamed, and it also contains an error
type BlockEvents struct {
Events *CadenceEvents
Expand Down
38 changes: 38 additions & 0 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@ import (
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/rs/zerolog"

"github.com/onflow/flow-evm-gateway/config"
"github.com/onflow/flow-evm-gateway/metrics"
"github.com/onflow/flow-evm-gateway/models"
"github.com/onflow/flow-evm-gateway/storage"
"github.com/onflow/flow-evm-gateway/storage/pebble"
"github.com/onflow/flow-go/fvm/evm"

offchain "github.com/onflow/flow-go/fvm/evm/offchain/blocks"
"github.com/onflow/flow-go/fvm/evm/offchain/sync"
)

var _ models.Engine = &Engine{}
Expand Down Expand Up @@ -48,6 +51,7 @@ type Engine struct {
blocksPublisher *models.Publisher[*models.Block]
logsPublisher *models.Publisher[[]*gethTypes.Log]
collector metrics.Collector
config *config.Config
}

func NewEventIngestionEngine(
Expand All @@ -62,6 +66,7 @@ func NewEventIngestionEngine(
logsPublisher *models.Publisher[[]*gethTypes.Log],
log zerolog.Logger,
collector metrics.Collector,
config *config.Config,
) *Engine {
log = log.With().Str("component", "ingestion").Logger()

Expand All @@ -79,6 +84,7 @@ func NewEventIngestionEngine(
blocksPublisher: blocksPublisher,
logsPublisher: logsPublisher,
collector: collector,
config: config,
}
}

Expand Down Expand Up @@ -190,6 +196,38 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
return fmt.Errorf("failed to index receipts for block %d event: %w", events.Block().Height, err)
}

if e.blockProvider != nil {
err = e.blockProvider.OnBlockReceived(events.BlockEventPayload())
if err != nil {
return fmt.Errorf("failed to call OnBlockReceived: %w", err)
}

chainID := e.config.FlowNetworkID
rootAddr := evm.StorageAccountAddress(chainID)
storageProvider := pebble.NewRegister(
e.store,
events.Block().Height,
)
cr := sync.NewReplayer(
chainID,
rootAddr,
storageProvider,
e.blockProvider,
e.log,
nil,
true,
)
res, err := cr.ReplayBlock(events.TxEventPayloads(), events.BlockEventPayload())
if err != nil {
return fmt.Errorf("failed to replay block on height: %d, with: %w", events.Block().Height, err)
}

err = e.blockProvider.OnBlockExecuted(events.Block().Height, res)
if err != nil {
return fmt.Errorf("failed to call OnBlockExecuted: %w", err)
}
}

if err := batch.Commit(pebbleDB.Sync); err != nil {
return fmt.Errorf("failed to commit indexed data for Cadence block %d: %w", events.CadenceHeight(), err)
}
Expand Down
5 changes: 5 additions & 0 deletions services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func TestSerialBlockIngestion(t *testing.T) {
models.NewPublisher[[]*gethTypes.Log](),
zerolog.Nop(),
metrics.NopCollector,
nil,
)

done := make(chan struct{})
Expand Down Expand Up @@ -154,6 +155,7 @@ func TestSerialBlockIngestion(t *testing.T) {
models.NewPublisher[[]*gethTypes.Log](),
zerolog.Nop(),
metrics.NopCollector,
nil,
)

waitErr := make(chan struct{})
Expand Down Expand Up @@ -270,6 +272,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
models.NewPublisher[[]*gethTypes.Log](),
zerolog.Nop(),
metrics.NopCollector,
nil,
)

done := make(chan struct{})
Expand Down Expand Up @@ -374,6 +377,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
models.NewPublisher[[]*gethTypes.Log](),
zerolog.Nop(),
metrics.NopCollector,
nil,
)

done := make(chan struct{})
Expand Down Expand Up @@ -471,6 +475,7 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
models.NewPublisher[[]*gethTypes.Log](),
zerolog.Nop(),
metrics.NopCollector,
nil,
)

done := make(chan struct{})
Expand Down
167 changes: 167 additions & 0 deletions storage/pebble/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package pebble

import (
"errors"
"fmt"
"sync"

"github.com/cockroachdb/pebble"
"github.com/onflow/atree"

errs "github.com/onflow/flow-evm-gateway/models/errors"

"github.com/onflow/flow-go/fvm/evm/types"
)

var _ atree.Ledger = &Register{}
var _ types.StorageProvider = &Register{}

type Register struct {
height uint64
store *Storage
mux sync.RWMutex
}

// NewRegister creates a new index instance at the provided height, all reads and
// writes of the registers will happen at that height.
func NewRegister(store *Storage, height uint64) *Register {
return &Register{
store: store,
height: height,
mux: sync.RWMutex{},
}
}

func (l *Register) GetSnapshotAt(evmBlockHeight uint64) (types.BackendStorageSnapshot, error) {
return &Register{
store: l.store,
height: evmBlockHeight - 1,
mux: sync.RWMutex{},
}, nil
}

func (l *Register) GetValue(owner, key []byte) ([]byte, error) {
l.mux.RLock()
defer l.mux.RUnlock()

db := l.store.db

iter, err := db.NewIter(&pebble.IterOptions{
LowerBound: l.idLower(owner, key),
UpperBound: l.idUpper(owner, key),
})
if err != nil {
return nil, fmt.Errorf("failed to create register range iterator: %w", err)
}
defer func() {
if err := iter.Close(); err != nil {
l.store.log.Error().Err(err).Msg("failed to close register iterator")
}
}()

found := iter.Last()
if !found {
// as per interface expectation we need to return nil if not found
return nil, nil
}

val, err := iter.ValueAndErr()
if err != nil {
return nil, fmt.Errorf(
"failed to get ledger value at owner %x and key %x: %w",
owner,
key,
err,
)
}

return val, nil
}

func (l *Register) SetValue(owner, key, value []byte) error {
l.mux.Lock()
defer l.mux.Unlock()

id := l.id(owner, key)
if err := l.store.set(ledgerValue, id, value, nil); err != nil {
return fmt.Errorf(
"failed to store ledger value for owner %x and key %x: %w",
owner,
key,
err,
)
}

return nil
}

func (l *Register) ValueExists(owner, key []byte) (bool, error) {
val, err := l.GetValue(owner, key)
if err != nil {
return false, err
}

return val != nil, nil
}

func (l *Register) AllocateSlabIndex(owner []byte) (atree.SlabIndex, error) {
l.mux.Lock()
defer l.mux.Unlock()

var index atree.SlabIndex

val, err := l.store.get(ledgerSlabIndex, owner)
if err != nil {
if !errors.Is(err, errs.ErrEntityNotFound) {
return atree.SlabIndexUndefined, err
}
}

if val != nil {
if len(val) != len(index) {
return atree.SlabIndexUndefined, fmt.Errorf(
"slab index was not stored in correct format for owner %x",
owner,
)
}

copy(index[:], val)
}

index = index.Next()
if err := l.store.set(ledgerSlabIndex, owner, index[:], nil); err != nil {
return atree.SlabIndexUndefined, fmt.Errorf(
"slab index failed to set for owner %x: %w",
owner,
err,
)
}

return index, nil
}

// id calculates a ledger id with embedded block height for owner and key.
// The key for a register has the following schema:
// {owner}{key}{height}
func (l *Register) id(owner, key []byte) []byte {
id := append(owner, key...)
h := uint64Bytes(l.height)
return append(id, h...)
}

func (l *Register) idUpper(owner, key []byte) []byte {
id := []byte{ledgerValue}
id = append(id, owner...)
id = append(id, key...)
// increase height +1 because upper bound is exclusive
h := uint64Bytes(l.height + 1)
return append(id, h...)
}

func (l *Register) idLower(owner, key []byte) []byte {
id := []byte{ledgerValue}
id = append(id, owner...)
id = append(id, key...)
// lower height is always 0
return append(id, uint64Bytes(0)...)
}

0 comments on commit fada9d7

Please sign in to comment.