Skip to content

Commit

Permalink
Merge pull request #652 from onflow/janez/storage-fixes
Browse files Browse the repository at this point in the history
Storage fixes
  • Loading branch information
janezpodhostnik authored Nov 13, 2024
2 parents 6781972 + ab6775e commit 381d878
Show file tree
Hide file tree
Showing 19 changed files with 401 additions and 732 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ generate:
mockery --dir=storage --name=BlockIndexer --output=storage/mocks
mockery --dir=storage --name=ReceiptIndexer --output=storage/mocks
mockery --dir=storage --name=TransactionIndexer --output=storage/mocks
mockery --dir=storage --name=AccountIndexer --output=storage/mocks
mockery --dir=storage --name=TraceIndexer --output=storage/mocks
mockery --all --dir=services/ingestion --output=services/ingestion/mocks
mockery --dir=models --name=Engine --output=models/mocks
Expand Down
16 changes: 0 additions & 16 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ type BlockChainAPI struct {
blocks storage.BlockIndexer
transactions storage.TransactionIndexer
receipts storage.ReceiptIndexer
accounts storage.AccountIndexer
indexingResumedHeight uint64
limiter limiter.Store
collector metrics.Collector
Expand All @@ -167,7 +166,6 @@ func NewBlockChainAPI(
blocks storage.BlockIndexer,
transactions storage.TransactionIndexer,
receipts storage.ReceiptIndexer,
accounts storage.AccountIndexer,
ratelimiter limiter.Store,
collector metrics.Collector,
) (*BlockChainAPI, error) {
Expand All @@ -184,7 +182,6 @@ func NewBlockChainAPI(
blocks: blocks,
transactions: transactions,
receipts: receipts,
accounts: accounts,
indexingResumedHeight: indexingResumedHeight,
limiter: ratelimiter,
collector: collector,
Expand Down Expand Up @@ -762,19 +759,6 @@ func (b *BlockChainAPI) GetTransactionCount(
return handleError[*hexutil.Uint64](err, l, b.collector)
}

nonce, err := b.accounts.GetNonce(address)
if err != nil {
return handleError[*hexutil.Uint64](errs.ErrInternal, l, b.collector)
}

// compare both until we gain confidence in db nonce tracking working correctly
if nonce != networkNonce {
l.Error().
Uint64("network-nonce", networkNonce).
Uint64("db-nonce", nonce).
Msg("network nonce does not equal db nonce")
}

return (*hexutil.Uint64)(&networkNonce), nil
}

Expand Down
24 changes: 17 additions & 7 deletions bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type Storages struct {
Blocks storage.BlockIndexer
Transactions storage.TransactionIndexer
Receipts storage.ReceiptIndexer
Accounts storage.AccountIndexer
Traces storage.TraceIndexer
}

Expand Down Expand Up @@ -156,7 +155,6 @@ func (b *Bootstrap) StartEventIngestion(ctx context.Context) error {
b.storages.Blocks,
b.storages.Receipts,
b.storages.Transactions,
b.storages.Accounts,
b.storages.Traces,
b.publishers.Block,
b.publishers.Logs,
Expand Down Expand Up @@ -251,7 +249,6 @@ func (b *Bootstrap) StartAPIServer(ctx context.Context) error {
b.storages.Blocks,
b.storages.Transactions,
b.storages.Receipts,
b.storages.Accounts,
ratelimiter,
b.collector,
)
Expand Down Expand Up @@ -388,6 +385,16 @@ func (b *Bootstrap) StopProfilerServer() {
}
}

func (b *Bootstrap) StopDB() {
if b.storages == nil || b.storages.Storage == nil {
return
}
err := b.storages.Storage.Close()
if err != nil {
b.logger.Err(err).Msg("PebbleDB graceful shutdown failed")
}
}

// StartEngine starts provided engine and panics if there are startup errors.
func StartEngine(
ctx context.Context,
Expand Down Expand Up @@ -488,12 +495,13 @@ func setupStorage(
}(batch)

cadenceHeight := config.InitCadenceHeight
evmBlokcHeight := uint64(0)
cadenceBlock, err := client.GetBlockHeaderByHeight(context.Background(), cadenceHeight)
if err != nil {
return nil, fmt.Errorf("could not fetch provided cadence height, make sure it's correct: %w", err)
}

snapshot, err := registerStore.GetSnapshotAt(0)
snapshot, err := registerStore.GetSnapshotAt(evmBlokcHeight)
if err != nil {
return nil, fmt.Errorf("could not get register snapshot at block height %d: %w", 0, err)
}
Expand All @@ -509,7 +517,7 @@ func setupStorage(
return nil, fmt.Errorf("could not set account status: %w", err)
}

err = registerStore.Store(delta.GetUpdates(), cadenceHeight, batch)
err = registerStore.Store(delta.GetUpdates(), evmBlokcHeight, batch)
if err != nil {
return nil, fmt.Errorf("could not store register updates: %w", err)
}
Expand All @@ -528,7 +536,9 @@ func setupStorage(
return nil, fmt.Errorf("could not commit register updates: %w", err)
}

logger.Info().Msgf("database initialized with cadence height: %d", cadenceHeight)
logger.Info().
Stringer("fvm_address_for_evm_storage_account", storageAddress).
Msgf("database initialized with cadence height: %d", cadenceHeight)
}
//else {
// // TODO(JanezP): verify storage account owner is correct
Expand All @@ -540,7 +550,6 @@ func setupStorage(
Registers: registerStore,
Transactions: pebble.NewTransactions(store),
Receipts: pebble.NewReceipts(store),
Accounts: pebble.NewAccounts(store),
Traces: pebble.NewTraces(store),
}, nil
}
Expand Down Expand Up @@ -580,6 +589,7 @@ func Run(ctx context.Context, cfg *config.Config, ready chan struct{}) error {
boot.StopEventIngestion()
boot.StopMetricsServer()
boot.StopAPIServer()
boot.StopDB()

return nil
}
53 changes: 28 additions & 25 deletions services/ingestion/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type Engine struct {
blocks storage.BlockIndexer
receipts storage.ReceiptIndexer
transactions storage.TransactionIndexer
accounts storage.AccountIndexer
traces storage.TraceIndexer
log zerolog.Logger
evmLastHeight *models.SequentialHeight
Expand All @@ -64,7 +63,6 @@ func NewEventIngestionEngine(
blocks storage.BlockIndexer,
receipts storage.ReceiptIndexer,
transactions storage.TransactionIndexer,
accounts storage.AccountIndexer,
traces storage.TraceIndexer,
blocksPublisher *models.Publisher[*models.Block],
logsPublisher *models.Publisher[[]*gethTypes.Log],
Expand All @@ -84,7 +82,6 @@ func NewEventIngestionEngine(
blocks: blocks,
receipts: receipts,
transactions: transactions,
accounts: accounts,
traces: traces,
log: log,
blocksPublisher: blocksPublisher,
Expand All @@ -96,7 +93,8 @@ func NewEventIngestionEngine(

// Stop the engine.
func (e *Engine) Stop() {
// todo
e.MarkDone()
<-e.Stopped()
}

// Run the Cadence event ingestion engine.
Expand All @@ -118,23 +116,33 @@ func (e *Engine) Run(ctx context.Context) error {
e.log.Info().Msg("starting ingestion")

e.MarkReady()

for events := range e.subscriber.Subscribe(ctx) {
if events.Err != nil {
return fmt.Errorf(
"failure in event subscription with: %w",
events.Err,
)
}

err := e.processEvents(events.Events)
if err != nil {
e.log.Error().Err(err).Msg("failed to process EVM events")
return err
defer e.MarkStopped()

events := e.subscriber.Subscribe(ctx)

for {
select {
case <-e.Done():
// stop the engine
return nil
case events, ok := <-events:
if !ok {
return nil
}
if events.Err != nil {
return fmt.Errorf(
"failure in event subscription with: %w",
events.Err,
)
}

err := e.processEvents(events.Events)
if err != nil {
e.log.Error().Err(err).Msg("failed to process EVM events")
return err
}
}
}

return nil
}

// processEvents converts the events to block and transactions and indexes them.
Expand Down Expand Up @@ -168,8 +176,7 @@ func (e *Engine) processEvents(events *models.CadenceEvents) error {
return nil // nothing else to do this was heartbeat event with not event payloads
}

// TODO(JanezP): accounts need an indexed batch. Investigate why and try to switch to non-indexed batch
batch := e.store.NewIndexedBatch()
batch := e.store.NewBatch()
defer func(batch *pebbleDB.Batch) {
err := batch.Close()
if err != nil {
Expand Down Expand Up @@ -327,10 +334,6 @@ func (e *Engine) indexTransaction(
return fmt.Errorf("failed to store tx: %s, with: %w", tx.Hash(), err)
}

if err := e.accounts.Update(tx, receipt, batch); err != nil {
return fmt.Errorf("failed to update accounts for tx: %s, with: %w", tx.Hash(), err)
}

return nil
}

Expand Down
30 changes: 0 additions & 30 deletions services/ingestion/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ func TestSerialBlockIngestion(t *testing.T) {
}).
Once() // make sure this isn't called multiple times

accounts := &storageMock.AccountIndexer{}
accounts.
On("Update").
Return(func() error { return nil })

traces := &storageMock.TraceIndexer{}

eventsChan := make(chan models.BlockEvents)
Expand All @@ -77,7 +72,6 @@ func TestSerialBlockIngestion(t *testing.T) {
blocks,
receipts,
transactions,
accounts,
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
Expand Down Expand Up @@ -139,11 +133,6 @@ func TestSerialBlockIngestion(t *testing.T) {
}).
Once() // make sure this isn't called multiple times

accounts := &storageMock.AccountIndexer{}
accounts.
On("Update", mock.Anything, mock.Anything).
Return(func(t models.TransactionCall, r *gethTypes.Receipt) error { return nil })

traces := &storageMock.TraceIndexer{}

eventsChan := make(chan models.BlockEvents)
Expand All @@ -162,7 +151,6 @@ func TestSerialBlockIngestion(t *testing.T) {
blocks,
receipts,
transactions,
accounts,
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
Expand Down Expand Up @@ -255,11 +243,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
return nil
})

accounts := &storageMock.AccountIndexer{}
accounts.
On("Update", mock.AnythingOfType("models.TransactionCall"), mock.AnythingOfType("*models.Receipt"), mock.Anything).
Return(func(tx models.Transaction, receipt *models.Receipt, _ *pebbleDB.Batch) error { return nil })

eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
Expand Down Expand Up @@ -289,7 +272,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
blocks,
receipts,
transactions,
accounts,
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
Expand Down Expand Up @@ -369,11 +351,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
On("SetLatestCadenceHeight", mock.AnythingOfType("uint64")).
Return(func(h uint64) error { return nil })

accounts := &storageMock.AccountIndexer{}
accounts.
On("Update", mock.AnythingOfType("models.TransactionCall"), mock.AnythingOfType("*models.Receipt"), mock.Anything).
Return(func(tx models.Transaction, receipt *models.Receipt, _ *pebbleDB.Batch) error { return nil })

eventsChan := make(chan models.BlockEvents)
subscriber := &mocks.EventSubscriber{}
subscriber.
Expand Down Expand Up @@ -403,7 +380,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
blocks,
receipts,
transactions,
accounts,
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
Expand Down Expand Up @@ -479,11 +455,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
}).
Once() // make sure this isn't called multiple times

accounts := &storageMock.AccountIndexer{}
accounts.
On("Update", mock.Anything, mock.AnythingOfType("*models.Receipt"), mock.Anything).
Return(func(t models.Transaction, r *models.Receipt, _ *pebbleDB.Batch) error { return nil })

traces := &storageMock.TraceIndexer{}

eventsChan := make(chan models.BlockEvents)
Expand All @@ -503,7 +474,6 @@ func TestBlockAndTransactionIngestion(t *testing.T) {
blocks,
receipts,
transactions,
accounts,
traces,
models.NewPublisher[*models.Block](),
models.NewPublisher[[]*gethTypes.Log](),
Expand Down
Loading

0 comments on commit 381d878

Please sign in to comment.