diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 6e6c72b5f76..a701f6908ac 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -787,75 +787,74 @@ func stageBorHeimdall(db kv.RwDB, ctx context.Context, unwindTypes []string, log heimdallClient := engine.(*bor.Bor).HeimdallClient - return db.Update(ctx, func(tx kv.RwTx) error { - if reset { - if err := reset2.ResetBorHeimdall(ctx, tx); err != nil { - return err - } - return nil - } - if unwind > 0 { - sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger) - defer sn.Close() - defer borSn.Close() - defer agg.Close() - - stageState := stage(sync, tx, nil, stages.BorHeimdall) - - snapshotsMaxBlock := borSn.BlocksAvailable() - if unwind <= snapshotsMaxBlock { - return fmt.Errorf("cannot unwind past snapshots max block: %d", snapshotsMaxBlock) - } - - if unwind > stageState.BlockNumber { - return fmt.Errorf("cannot unwind to a point beyond stage: %d", stageState.BlockNumber) - } - - unwindState := sync.NewUnwindState(stages.BorHeimdall, stageState.BlockNumber-unwind, stageState.BlockNumber, true, false) - cfg := stagedsync.StageBorHeimdallCfg(db, nil, miningState, *chainConfig, nil, heimdallStore, bridgeStore, nil, nil, nil, nil, nil, false, unwindTypes) - if err := stagedsync.BorHeimdallUnwind(unwindState, ctx, stageState, tx, cfg); err != nil { - return err - } - - stageProgress, err := stages.GetStageProgress(tx, stages.BorHeimdall) - if err != nil { - return fmt.Errorf("re-read bor heimdall progress: %w", err) - } - - logger.Info("progress", "bor heimdall", stageProgress) - return nil + var tx kv.RwTx + if reset { + if err := reset2.ResetBorHeimdall(ctx, tx, db); err != nil { + return err } - + return nil + } + if unwind > 0 { sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger) defer sn.Close() defer borSn.Close() defer agg.Close() - blockReader, _ := blocksIO(db, logger) - var ( - snapDb kv.RwDB - recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot] - signatures *lru.ARCCache[libcommon.Hash, libcommon.Address] - ) - if bor, ok := engine.(*bor.Bor); ok { - snapDb = bor.DB - recents = bor.Recents - signatures = bor.Signatures - } - cfg := stagedsync.StageBorHeimdallCfg(db, snapDb, miningState, *chainConfig, heimdallClient, heimdallStore, bridgeStore, blockReader, nil, nil, recents, signatures, false, unwindTypes) stageState := stage(sync, tx, nil, stages.BorHeimdall) - if err := stagedsync.BorHeimdallForward(stageState, sync, ctx, tx, cfg, logger); err != nil { + + snapshotsMaxBlock := borSn.BlocksAvailable() + if unwind <= snapshotsMaxBlock { + return fmt.Errorf("cannot unwind past snapshots max block: %d", snapshotsMaxBlock) + } + + if unwind > stageState.BlockNumber { + return fmt.Errorf("cannot unwind to a point beyond stage: %d", stageState.BlockNumber) + } + + unwindState := sync.NewUnwindState(stages.BorHeimdall, stageState.BlockNumber-unwind, stageState.BlockNumber, true, false) + cfg := stagedsync.StageBorHeimdallCfg(db, nil, miningState, *chainConfig, nil, heimdallStore, bridgeStore, nil, nil, nil, nil, nil, false, unwindTypes) + if err := stagedsync.BorHeimdallUnwind(unwindState, ctx, stageState, tx, cfg); err != nil { return err } - stageProgress, err := stages.GetStageProgress(tx, stages.BorHeimdall) + stageProgress, err := stagedsync.BorHeimdallStageProgress(tx, cfg) if err != nil { return fmt.Errorf("re-read bor heimdall progress: %w", err) } logger.Info("progress", "bor heimdall", stageProgress) return nil - }) + } + + sn, borSn, agg, _, bridgeStore, heimdallStore := allSnapshots(ctx, db, logger) + defer sn.Close() + defer borSn.Close() + defer agg.Close() + blockReader, _ := blocksIO(db, logger) + var ( + snapDb kv.RwDB + recents *lru.ARCCache[libcommon.Hash, *bor.Snapshot] + signatures *lru.ARCCache[libcommon.Hash, libcommon.Address] + ) + if bor, ok := engine.(*bor.Bor); ok { + snapDb = bor.DB + recents = bor.Recents + signatures = bor.Signatures + } + cfg := stagedsync.StageBorHeimdallCfg(db, snapDb, miningState, *chainConfig, heimdallClient, heimdallStore, bridgeStore, blockReader, nil, nil, recents, signatures, false, unwindTypes) + + stageState := stage(sync, tx, nil, stages.BorHeimdall) + if err := stagedsync.BorHeimdallForward(stageState, sync, ctx, tx, cfg, logger); err != nil { + return err + } + + stageProgress, err := stagedsync.BorHeimdallStageProgress(tx, cfg) + if err != nil { + return fmt.Errorf("re-read bor heimdall progress: %w", err) + } + + logger.Info("progress", "bor heimdall", stageProgress) + return nil } func stageBodies(db kv.RwDB, ctx context.Context, logger log.Logger) error { diff --git a/core/rawdb/rawdbreset/reset_stages.go b/core/rawdb/rawdbreset/reset_stages.go index 9cd23a3232b..dcc4886027b 100644 --- a/core/rawdb/rawdbreset/reset_stages.go +++ b/core/rawdb/rawdbreset/reset_stages.go @@ -99,7 +99,16 @@ func ResetBlocks(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services.Full return nil } -func ResetBorHeimdall(ctx context.Context, tx kv.RwTx) error { +func ResetBorHeimdall(ctx context.Context, tx kv.RwTx, db kv.RwDB) error { + useExternalTx := tx != nil + if !useExternalTx { + var err error + tx, err = db.BeginRw(ctx) + if err != nil { + return err + } + defer tx.Rollback() + } if err := tx.ClearBucket(kv.BorEventNums); err != nil { return err } @@ -109,7 +118,13 @@ func ResetBorHeimdall(ctx context.Context, tx kv.RwTx) error { if err := tx.ClearBucket(kv.BorSpans); err != nil { return err } - return clearStageProgress(tx, stages.BorHeimdall) + if err := clearStageProgress(tx, stages.BorHeimdall); err != nil { + return err + } + if !useExternalTx { + return tx.Commit() + } + return nil } func ResetPolygonSync(tx kv.RwTx, db kv.RoDB, agg *state.Aggregator, br services.FullBlockReader, bw *blockio.BlockWriter, dirs datadir.Dirs, cc chain.Config, logger log.Logger) error { diff --git a/eth/stagedsync/stage_execute.go b/eth/stagedsync/stage_execute.go index 894f1f56de6..9d75c8b7804 100644 --- a/eth/stagedsync/stage_execute.go +++ b/eth/stagedsync/stage_execute.go @@ -152,7 +152,7 @@ func ExecBlockV3(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64 workersCount = 1 } - prevStageProgress, err := senderStageProgress(txc.Tx, cfg.db) + prevStageProgress, err := stageProgress(txc.Tx, cfg.db, stages.Senders) if err != nil { return err } @@ -230,15 +230,15 @@ func unwindExec3(u *UnwindState, s *StageState, txc wrap.TxContainer, ctx contex return nil } -func senderStageProgress(tx kv.Tx, db kv.RoDB) (prevStageProgress uint64, err error) { +func stageProgress(tx kv.Tx, db kv.RoDB, stage stages.SyncStage) (prevStageProgress uint64, err error) { if tx != nil { - prevStageProgress, err = stages.GetStageProgress(tx, stages.Senders) + prevStageProgress, err = stages.GetStageProgress(tx, stage) if err != nil { return prevStageProgress, err } } else { if err = db.View(context.Background(), func(tx kv.Tx) error { - prevStageProgress, err = stages.GetStageProgress(tx, stages.Senders) + prevStageProgress, err = stages.GetStageProgress(tx, stage) if err != nil { return err } @@ -250,6 +250,10 @@ func senderStageProgress(tx kv.Tx, db kv.RoDB) (prevStageProgress uint64, err er return prevStageProgress, nil } +func BorHeimdallStageProgress(tx kv.Tx, cfg BorHeimdallCfg) (prevStageProgress uint64, err error) { + return stageProgress(tx, cfg.db, stages.BorHeimdall) +} + // ================ Erigon3 End ================ func SpawnExecuteBlocksStage(s *StageState, u Unwinder, txc wrap.TxContainer, toBlock uint64, ctx context.Context, cfg ExecuteBlockCfg, logger log.Logger) (err error) { diff --git a/migrations/clear_bor_tables.go b/migrations/clear_bor_tables.go index da8095ce897..bff30464dc4 100644 --- a/migrations/clear_bor_tables.go +++ b/migrations/clear_bor_tables.go @@ -23,7 +23,7 @@ var ClearBorTables = Migration{ return err } - if err := reset2.ResetBorHeimdall(context.Background(), tx); err != nil { + if err := reset2.ResetBorHeimdall(context.Background(), tx, db); err != nil { return err }