diff --git a/cl/antiquary/antiquary.go b/cl/antiquary/antiquary.go index 79d736b9708..1f36f508969 100644 --- a/cl/antiquary/antiquary.go +++ b/cl/antiquary/antiquary.go @@ -163,6 +163,12 @@ func (a *Antiquary) Loop() error { if err := a.sn.OpenFolder(); err != nil { return err } + if a.stateSn != nil { + if err := a.stateSn.OpenFolder(); err != nil { + return err + } + } + defer logInterval.Stop() if from != a.sn.BlocksAvailable() && a.sn.BlocksAvailable() != 0 { a.logger.Info("[Antiquary] Stopping Caplin to process historical indicies", "from", from, "to", a.sn.BlocksAvailable()) diff --git a/cl/antiquary/state_antiquary.go b/cl/antiquary/state_antiquary.go index c293d27583a..abe3ffe8a51 100644 --- a/cl/antiquary/state_antiquary.go +++ b/cl/antiquary/state_antiquary.go @@ -488,11 +488,15 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error { if err != nil { return err } + log.Info("Historical states antiquated", "slot", s.currentState.Slot(), "root", libcommon.Hash(stateRoot), "latency", endTime) - if s.snapgen { + if s.stateSn != nil { if err := s.stateSn.OpenFolder(); err != nil { return err } + } + + if s.snapgen { // Keep gnosis out for a bit if s.currentState.BeaconConfig().ConfigName == "gnosis" { diff --git a/cl/validator/committee_subscription/committee_subscription.go b/cl/validator/committee_subscription/committee_subscription.go index ca8c9dffadd..652e99968cf 100644 --- a/cl/validator/committee_subscription/committee_subscription.go +++ b/cl/validator/committee_subscription/committee_subscription.go @@ -69,7 +69,6 @@ func NewCommitteeSubscribeManagement( netConfig *clparams.NetworkConfig, ethClock eth_clock.EthereumClock, sentinel sentinel.SentinelClient, - state *state.CachingBeaconState, aggregationPool aggregation.AggregationPool, syncedData *synced_data.SyncedDataManager, ) *CommitteeSubscribeMgmt { @@ -79,7 +78,6 @@ func NewCommitteeSubscribeManagement( netConfig: netConfig, ethClock: ethClock, sentinel: sentinel, - state: state, aggregationPool: aggregationPool, syncedData: syncedData, validatorSubs: make(map[uint64]*validatorSub), diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index da0208c6510..4eb48502d59 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -303,7 +303,7 @@ func RunCaplinService(ctx context.Context, engine execution_client.ExecutionEngi return err } beaconRpc := rpc.NewBeaconRpcP2P(ctx, sentinel, beaconConfig, ethClock) - committeeSub := committee_subscription.NewCommitteeSubscribeManagement(ctx, indexDB, beaconConfig, networkConfig, ethClock, sentinel, state, aggregationPool, syncedDataManager) + committeeSub := committee_subscription.NewCommitteeSubscribeManagement(ctx, indexDB, beaconConfig, networkConfig, ethClock, sentinel, aggregationPool, syncedDataManager) batchSignatureVerifier := services.NewBatchSignatureVerifier(ctx, sentinel) // Define gossip services blockService := services.NewBlockService(ctx, indexDB, forkChoice, syncedDataManager, ethClock, beaconConfig, emitters) diff --git a/erigon-lib/chain/snapcfg/util.go b/erigon-lib/chain/snapcfg/util.go index 6a13bff1e6a..c092d4ff216 100644 --- a/erigon-lib/chain/snapcfg/util.go +++ b/erigon-lib/chain/snapcfg/util.go @@ -94,6 +94,11 @@ func (p Preverified) Typed(types []snaptype.Type) Preverified { continue } + if strings.HasPrefix(p.Name, "caplin") { + bestVersions.Set(p.Name, p) + continue + } + var preferredVersion, minVersion snaptype.Version countSep := 0 @@ -393,6 +398,9 @@ func (c Cfg) MergeLimit(t snaptype.Enum, fromBlock uint64) uint64 { if !ok { continue } + if strings.Contains(p.Name, "caplin") { + continue + } if info.Ext != ".seg" || (t != snaptype.Unknown && t != info.Type.Enum()) { continue diff --git a/erigon-lib/downloader/snaptype/files.go b/erigon-lib/downloader/snaptype/files.go index 4ac92207dc2..aebac45771a 100644 --- a/erigon-lib/downloader/snaptype/files.go +++ b/erigon-lib/downloader/snaptype/files.go @@ -124,6 +124,9 @@ func ParseFileName(dir, fileName string) (res FileInfo, isE3Seedable bool, ok bo } } } + if strings.Contains(fileName, "caplin/") { + return res, isStateFile, true + } return res, isStateFile, isStateFile } @@ -266,8 +269,7 @@ func (f FileInfo) CompareTo(o FileInfo) int { return res } - // this is a lexical comparison (don't use enum) - return strings.Compare(f.Type.Name(), o.Type.Name()) + return strings.Compare(f.name, o.name) } func (f FileInfo) As(t Type) FileInfo { diff --git a/eth/backend.go b/eth/backend.go index 8ef6968d21e..647538ff57e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -1017,7 +1017,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger backend.polygonDownloadSync = stagedsync.New(backend.config.Sync, stagedsync.DownloadSyncStages( backend.sentryCtx, stagedsync.StageSnapshotsCfg( backend.chainDB, *backend.sentriesClient.ChainConfig, config.Sync, dirs, blockRetire, backend.downloaderClient, - blockReader, backend.notifications, backend.agg, false, false, backend.silkworm, config.Prune, + blockReader, backend.notifications, backend.agg, false, false, false, backend.silkworm, config.Prune, )), nil, nil, backend.logger) // these range extractors set the db to the local db instead of the chain db diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index f8e056623e9..ff96f11442f 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -95,6 +95,7 @@ type SnapshotsCfg struct { caplin bool blobs bool + caplinState bool agg *state.Aggregator silkworm *silkworm.Silkworm snapshotUploader *snapshotUploader @@ -113,6 +114,7 @@ func StageSnapshotsCfg(db kv.RwDB, agg *state.Aggregator, caplin bool, blobs bool, + caplinState bool, silkworm *silkworm.Silkworm, prune prune.Mode, ) SnapshotsCfg { @@ -130,6 +132,7 @@ func StageSnapshotsCfg(db kv.RwDB, syncConfig: syncConfig, blobs: blobs, prune: prune, + caplinState: caplinState, } if uploadFs := cfg.syncConfig.UploadLocation; len(uploadFs) > 0 { @@ -277,7 +280,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "Download header-chain"}) // Download only the snapshots that are for the header chain. - if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, true /*headerChain=*/, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { + if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, true /*headerChain=*/, cfg.blobs, cfg.caplinState, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { return err } @@ -286,7 +289,7 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R } diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "Download snapshots"}) - if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, false /*headerChain=*/, cfg.blobs, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { + if err := snapshotsync.WaitForDownloader(ctx, s.LogPrefix(), cfg.dirs, false /*headerChain=*/, cfg.blobs, cfg.caplinState, cfg.prune, cstate, cfg.agg, tx, cfg.blockReader, &cfg.chainConfig, cfg.snapshotDownloader, s.state.StagesIdsList()); err != nil { return err } if cfg.notifier.Events != nil { diff --git a/turbo/snapshotsync/snapshotsync.go b/turbo/snapshotsync/snapshotsync.go index e37be4f5c70..facbe50c24e 100644 --- a/turbo/snapshotsync/snapshotsync.go +++ b/turbo/snapshotsync/snapshotsync.go @@ -283,7 +283,7 @@ func computeBlocksToPrune(blockReader blockReader, p prune.Mode) (blocksToPrune // WaitForDownloader - wait for Downloader service to download all expected snapshots // for MVP we sync with Downloader only once, in future will send new snapshots also -func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs, headerchain, blobs bool, prune prune.Mode, caplin CaplinMode, agg *state.Aggregator, tx kv.RwTx, blockReader blockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient, stagesIdsList []string) error { +func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs, headerchain, blobs, caplinState bool, prune prune.Mode, caplin CaplinMode, agg *state.Aggregator, tx kv.RwTx, blockReader blockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient, stagesIdsList []string) error { snapshots := blockReader.Snapshots() borSnapshots := blockReader.BorSnapshots() @@ -331,15 +331,18 @@ func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs, // build all download requests for _, p := range preverifiedBlockSnapshots { - if caplin == NoCaplin && (strings.Contains(p.Name, "beaconblocks") || strings.Contains(p.Name, "blobsidecars")) { + if caplin == NoCaplin && (strings.Contains(p.Name, "beaconblocks") || strings.Contains(p.Name, "blobsidecars") || strings.Contains(p.Name, "caplin")) { continue } - if caplin == OnlyCaplin && !strings.Contains(p.Name, "beaconblocks") && !strings.Contains(p.Name, "blobsidecars") { + if caplin == OnlyCaplin && !strings.Contains(p.Name, "beaconblocks") && !strings.Contains(p.Name, "blobsidecars") && !strings.Contains(p.Name, "caplin") { continue } if !blobs && strings.Contains(p.Name, "blobsidecars") { continue } + if !caplinState && strings.Contains(p.Name, "caplin/") { + continue + } if headerchain && !strings.Contains(p.Name, "headers") && !strings.Contains(p.Name, "bodies") { continue } @@ -466,6 +469,13 @@ func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs, return err } } + if caplinState { + if _, err := snapshotDownloader.ProhibitNewDownloads(ctx, &proto_downloader.ProhibitNewDownloadsRequest{ + Type: "caplin", + }); err != nil { + return err + } + } } firstNonGenesis, err := rawdbv3.SecondKey(tx, kv.Headers) diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index d656ab15910..03c334d8124 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -525,7 +525,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK mock.agg.SetProduceMod(mock.BlockReader.FreezingCfg().ProduceE3) mock.Sync = stagedsync.New( cfg.Sync, - stagedsync.DefaultStages(mock.Ctx, stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, mock.BlockReader, mock.Notifications, mock.agg, false, false, nil, prune), stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, cfg.Sync, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, mock.BlockReader, blockWriter, dirs.Tmp, mock.Notifications), stagedsync.StageBorHeimdallCfg(mock.DB, snapDb, stagedsync.MiningState{}, *mock.ChainConfig, nil, nil, nil, mock.BlockReader, nil, nil, recents, signatures, false, nil), stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig, blockWriter), stagedsync.StageBodiesCfg(mock.DB, mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, cfg.Sync.BodyDownloadTimeoutSeconds, *mock.ChainConfig, mock.BlockReader, blockWriter), stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, cfg.Sync, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd), stagedsync.StageExecuteBlocksCfg( + stagedsync.DefaultStages(mock.Ctx, stagedsync.StageSnapshotsCfg(mock.DB, *mock.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, mock.BlockReader, mock.Notifications, mock.agg, false, false, false, nil, prune), stagedsync.StageHeadersCfg(mock.DB, mock.sentriesClient.Hd, mock.sentriesClient.Bd, *mock.ChainConfig, cfg.Sync, sendHeaderRequest, propagateNewBlockHashes, penalize, cfg.BatchSize, false, mock.BlockReader, blockWriter, dirs.Tmp, mock.Notifications), stagedsync.StageBorHeimdallCfg(mock.DB, snapDb, stagedsync.MiningState{}, *mock.ChainConfig, nil, nil, nil, mock.BlockReader, nil, nil, recents, signatures, false, nil), stagedsync.StageBlockHashesCfg(mock.DB, mock.Dirs.Tmp, mock.ChainConfig, blockWriter), stagedsync.StageBodiesCfg(mock.DB, mock.sentriesClient.Bd, sendBodyRequest, penalize, blockPropagator, cfg.Sync.BodyDownloadTimeoutSeconds, *mock.ChainConfig, mock.BlockReader, blockWriter), stagedsync.StageSendersCfg(mock.DB, mock.ChainConfig, cfg.Sync, false, dirs.Tmp, prune, mock.BlockReader, mock.sentriesClient.Hd), stagedsync.StageExecuteBlocksCfg( mock.DB, prune, cfg.BatchSize, diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 1edea3b9ecc..fbe0cdffa60 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -693,7 +693,7 @@ func NewDefaultStages(ctx context.Context, runInTestMode := cfg.ImportMode return stagedsync.DefaultStages(ctx, - stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm, cfg.Prune), + stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, cfg.CaplinConfig.Archive, silkworm, cfg.Prune), stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications), stagedsync.StageBorHeimdallCfg(db, snapDb, stagedsync.MiningState{}, *controlServer.ChainConfig, heimdallClient, heimdallStore, bridgeStore, blockReader, controlServer.Hd, controlServer.Penalize, recents, signatures, cfg.WithHeimdallWaypointRecording, nil), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), @@ -734,7 +734,7 @@ func NewPipelineStages(ctx context.Context, if len(cfg.Sync.UploadLocation) == 0 { return stagedsync.PipelineStages(ctx, - stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm, cfg.Prune), + stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, cfg.CaplinConfig.Archive, silkworm, cfg.Prune), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd), stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications, cfg.StateStream, false, false, cfg.ChaosMonkey, dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg)), @@ -743,7 +743,7 @@ func NewPipelineStages(ctx context.Context, } return stagedsync.UploaderPipelineStages(ctx, - stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm, cfg.Prune), + stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, cfg.CaplinConfig.Archive, silkworm, cfg.Prune), stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications), stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter), stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd), @@ -802,6 +802,7 @@ func NewPolygonSyncStages( agg, config.InternalCL && config.CaplinConfig.Backfilling, config.CaplinConfig.BlobBackfilling, + config.CaplinConfig.Archive, silkworm, config.Prune, ),