diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 2a985da3dd8..6ec79994829 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -12,6 +12,7 @@ import ( "time" "github.com/ipfs/boxo/bitswap" + "github.com/ipfs/go-cid" badger "github.com/ipfs/go-ds-badger2" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/routing" @@ -65,6 +66,8 @@ import ( "github.com/onflow/flow-go/module/execution" "github.com/onflow/flow-go/module/executiondatasync/execution_data" execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" + "github.com/onflow/flow-go/module/executiondatasync/pruner" + "github.com/onflow/flow-go/module/executiondatasync/tracker" finalizer "github.com/onflow/flow-go/module/finalizer/consensus" "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/id" @@ -124,40 +127,42 @@ import ( // For a node running as a standalone process, the config fields will be populated from the command line params, // while for a node running as a library, the config fields are expected to be initialized by the caller. type AccessNodeConfig struct { - supportsObserver bool // True if this is an Access node that supports observers and consensus follower engines - collectionGRPCPort uint - executionGRPCPort uint - pingEnabled bool - nodeInfoFile string - apiRatelimits map[string]int - apiBurstlimits map[string]int - rpcConf rpc.Config - stateStreamConf statestreambackend.Config - stateStreamFilterConf map[string]int - ExecutionNodeAddress string // deprecated - HistoricalAccessRPCs []access.AccessAPIClient - logTxTimeToFinalized bool - logTxTimeToExecuted bool - logTxTimeToFinalizedExecuted bool - retryEnabled bool - rpcMetricsEnabled bool - executionDataSyncEnabled bool - publicNetworkExecutionDataEnabled bool - executionDataDir string - executionDataStartHeight uint64 - executionDataConfig edrequester.ExecutionDataConfig - PublicNetworkConfig PublicNetworkConfig - TxResultCacheSize uint - TxErrorMessagesCacheSize uint - executionDataIndexingEnabled bool - registersDBPath string - checkpointFile string - scriptExecutorConfig query.QueryConfig - scriptExecMinBlock uint64 - scriptExecMaxBlock uint64 - registerCacheType string - registerCacheSize uint - programCacheSize uint + supportsObserver bool // True if this is an Access node that supports observers and consensus follower engines + collectionGRPCPort uint + executionGRPCPort uint + pingEnabled bool + nodeInfoFile string + apiRatelimits map[string]int + apiBurstlimits map[string]int + rpcConf rpc.Config + stateStreamConf statestreambackend.Config + stateStreamFilterConf map[string]int + ExecutionNodeAddress string // deprecated + HistoricalAccessRPCs []access.AccessAPIClient + logTxTimeToFinalized bool + logTxTimeToExecuted bool + logTxTimeToFinalizedExecuted bool + retryEnabled bool + rpcMetricsEnabled bool + executionDataSyncEnabled bool + publicNetworkExecutionDataEnabled bool + executionDataPrunerHeightRangeTarget uint64 + executionDataPrunerThreshold uint64 + executionDataDir string + executionDataStartHeight uint64 + executionDataConfig edrequester.ExecutionDataConfig + PublicNetworkConfig PublicNetworkConfig + TxResultCacheSize uint + TxErrorMessagesCacheSize uint + executionDataIndexingEnabled bool + registersDBPath string + checkpointFile string + scriptExecutorConfig query.QueryConfig + scriptExecMinBlock uint64 + scriptExecMaxBlock uint64 + registerCacheType string + registerCacheSize uint + programCacheSize uint } type PublicNetworkConfig struct { @@ -246,15 +251,17 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { RetryDelay: edrequester.DefaultRetryDelay, MaxRetryDelay: edrequester.DefaultMaxRetryDelay, }, - executionDataIndexingEnabled: false, - registersDBPath: filepath.Join(homedir, ".flow", "execution_state"), - checkpointFile: cmd.NotSet, - scriptExecutorConfig: query.NewDefaultConfig(), - scriptExecMinBlock: 0, - scriptExecMaxBlock: math.MaxUint64, - registerCacheType: pStorage.CacheTypeTwoQueue.String(), - registerCacheSize: 0, - programCacheSize: 0, + executionDataIndexingEnabled: false, + executionDataPrunerHeightRangeTarget: 0, + executionDataPrunerThreshold: 100_000, + registersDBPath: filepath.Join(homedir, ".flow", "execution_state"), + checkpointFile: cmd.NotSet, + scriptExecutorConfig: query.NewDefaultConfig(), + scriptExecMinBlock: 0, + scriptExecMaxBlock: math.MaxUint64, + registerCacheType: pStorage.CacheTypeTwoQueue.String(), + registerCacheSize: 0, + programCacheSize: 0, } } @@ -288,6 +295,7 @@ type FlowAccessNodeBuilder struct { PublicBlobService network.BlobService ExecutionDataRequester state_synchronization.ExecutionDataRequester ExecutionDataStore execution_data.ExecutionDataStore + ExecutionDataBlobstore blobs.Blobstore ExecutionDataCache *execdatacache.ExecutionDataCache ExecutionIndexer *indexer.Indexer ExecutionIndexerCore *indexer.IndexerCore @@ -298,6 +306,9 @@ type FlowAccessNodeBuilder struct { TxResultsIndex *index.TransactionResultsIndex IndexerDependencies *cmd.DependencyList collectionExecutedMetric module.CollectionExecutedMetric + ExecutionDataPruner *pruner.Pruner + ExecutionDataDatastore *badger.Datastore + ExecutionDataTracker tracker.Storage // The sync engine participants provider is the libp2p peer store for the access node // which is not available until after the network has started. @@ -502,7 +513,6 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu } func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder { - var ds *badger.Datastore var bs network.BlobService var processedBlockHeight storage.ConsumerProgress var processedNotifications storage.ConsumerProgress @@ -526,13 +536,12 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess return err } - ds, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions) + builder.ExecutionDataDatastore, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions) if err != nil { return err } - builder.ShutdownFunc(func() error { - if err := ds.Close(); err != nil { + if err := builder.ExecutionDataDatastore.Close(); err != nil { return fmt.Errorf("could not close execution data datastore: %w", err) } return nil @@ -543,13 +552,13 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess Module("processed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. - processedBlockHeight = bstorage.NewConsumerProgress(ds.DB, module.ConsumeProgressExecutionDataRequesterBlockHeight) + processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterBlockHeight) return nil }). Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. - processedNotifications = bstorage.NewConsumerProgress(ds.DB, module.ConsumeProgressExecutionDataRequesterNotification) + processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterNotification) return nil }). Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error { @@ -558,8 +567,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess return nil }). Module("execution datastore", func(node *cmd.NodeConfig) error { - blobstore := blobs.NewBlobstore(ds) - builder.ExecutionDataStore = execution_data.NewExecutionDataStore(blobstore, execution_data.DefaultSerializer) + builder.ExecutionDataBlobstore = blobs.NewBlobstore(builder.ExecutionDataDatastore) + builder.ExecutionDataStore = execution_data.NewExecutionDataStore(builder.ExecutionDataBlobstore, execution_data.DefaultSerializer) return nil }). Module("execution data cache", func(node *cmd.NodeConfig) error { @@ -601,7 +610,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess } var err error - bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, ds, opts...) + bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, builder.ExecutionDataDatastore, opts...) if err != nil { return nil, fmt.Errorf("could not register blob service: %w", err) } @@ -611,8 +620,34 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess // to be ready before starting bsDependable.Init(bs) - builder.ExecutionDataDownloader = execution_data.NewDownloader(bs) + var downloaderOpts []execution_data.DownloaderOption + + if builder.executionDataPrunerHeightRangeTarget != 0 { + sealed, err := node.State.Sealed().Head() + if err != nil { + return nil, fmt.Errorf("cannot get the sealed block: %w", err) + } + + trackerDir := filepath.Join(builder.executionDataDir, "tracker") + builder.ExecutionDataTracker, err = tracker.OpenStorage( + trackerDir, + sealed.Height, + node.Logger, + tracker.WithPruneCallback(func(c cid.Cid) error { + // TODO: use a proper context here + return builder.ExecutionDataBlobstore.DeleteBlob(context.TODO(), c) + }), + ) + if err != nil { + return nil, fmt.Errorf("failed to create execution data tracker: %w", err) + } + + downloaderOpts = []execution_data.DownloaderOption{ + execution_data.WithExecutionDataTracker(builder.ExecutionDataTracker, node.Storage.Headers), + } + } + builder.ExecutionDataDownloader = execution_data.NewDownloader(bs, downloaderOpts...) return builder.ExecutionDataDownloader, nil }). Component("execution data requester", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { @@ -683,6 +718,50 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess requesterDependable.Init(builder.ExecutionDataRequester) return builder.ExecutionDataRequester, nil + }). + Component("execution data pruner", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + if builder.executionDataPrunerHeightRangeTarget == 0 { + return &module.NoopReadyDoneAware{}, nil + } + + execDataDistributor.AddOnExecutionDataReceivedConsumer(func(data *execution_data.BlockExecutionDataEntity) { + header, err := node.Storage.Headers.ByBlockID(data.BlockID) + if err != nil { + // if the execution data is available, the block must be locally finalized + node.Logger.Fatal().Err(err).Msg("failed to get header for execution data") + } + + if builder.ExecutionDataPruner != nil { + err = builder.ExecutionDataTracker.SetFulfilledHeight(header.Height) + if err != nil { + node.Logger.Fatal().Err(err).Msg("failed to set fulfilled height") + } + + builder.ExecutionDataPruner.NotifyFulfilledHeight(header.Height) + } + }) + + var prunerMetrics module.ExecutionDataPrunerMetrics = metrics.NewNoopCollector() + if node.MetricsEnabled { + prunerMetrics = metrics.NewExecutionDataPrunerCollector() + } + + var err error + builder.ExecutionDataPruner, err = pruner.NewPruner( + node.Logger, + prunerMetrics, + builder.ExecutionDataTracker, + pruner.WithPruneCallback(func(ctx context.Context) error { + return builder.ExecutionDataDatastore.CollectGarbage(ctx) + }), + pruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget), + pruner.WithThreshold(builder.executionDataPrunerThreshold), + ) + if err != nil { + return nil, fmt.Errorf("failed to create execution data pruner: %w", err) + } + + return builder.ExecutionDataPruner, nil }) if builder.publicNetworkExecutionDataEnabled { @@ -706,7 +785,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess net := builder.AccessNodeConfig.PublicNetworkConfig.Network var err error - builder.PublicBlobService, err = net.RegisterBlobService(channels.PublicExecutionDataService, ds, opts...) + builder.PublicBlobService, err = net.RegisterBlobService(channels.PublicExecutionDataService, builder.ExecutionDataDatastore, opts...) if err != nil { return nil, fmt.Errorf("could not register blob service: %w", err) } @@ -1179,6 +1258,14 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { "execution-data-max-retry-delay", defaultConfig.executionDataConfig.MaxRetryDelay, "maximum delay for exponential backoff when fetching execution data fails e.g. 5m") + flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, + "execution-data-height-range-target", + defaultConfig.executionDataPrunerHeightRangeTarget, + "number of blocks of Execution Data to keep on disk. older data is pruned") + flags.Uint64Var(&builder.executionDataPrunerThreshold, + "execution-data-height-range-threshold", + defaultConfig.executionDataPrunerThreshold, + "number of unpruned blocks of Execution Data beyond the height range target to allow before pruning") // Execution State Streaming API flags.Uint32Var(&builder.stateStreamConf.ExecutionDataCacheSize, "execution-data-cache-size", defaultConfig.stateStreamConf.ExecutionDataCacheSize, "block execution data cache size") diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index ee817aa985b..d7eae4c133b 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -14,6 +14,7 @@ import ( "time" "github.com/ipfs/boxo/bitswap" + "github.com/ipfs/go-cid" badger "github.com/ipfs/go-ds-badger2" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" @@ -64,6 +65,8 @@ import ( "github.com/onflow/flow-go/module/execution" "github.com/onflow/flow-go/module/executiondatasync/execution_data" execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" + "github.com/onflow/flow-go/module/executiondatasync/pruner" + "github.com/onflow/flow-go/module/executiondatasync/tracker" finalizer "github.com/onflow/flow-go/module/finalizer/consensus" "github.com/onflow/flow-go/module/grpcserver" "github.com/onflow/flow-go/module/id" @@ -127,37 +130,39 @@ import ( // For a node running as a standalone process, the config fields will be populated from the command line params, // while for a node running as a library, the config fields are expected to be initialized by the caller. type ObserverServiceConfig struct { - bootstrapNodeAddresses []string - bootstrapNodePublicKeys []string - observerNetworkingKeyPath string - bootstrapIdentities flow.IdentitySkeletonList // the identity list of bootstrap peers the node uses to discover other nodes - apiRatelimits map[string]int - apiBurstlimits map[string]int - rpcConf rpc.Config - rpcMetricsEnabled bool - registersDBPath string - checkpointFile string - apiTimeout time.Duration - stateStreamConf statestreambackend.Config - stateStreamFilterConf map[string]int - upstreamNodeAddresses []string - upstreamNodePublicKeys []string - upstreamIdentities flow.IdentitySkeletonList // the identity list of upstream peers the node uses to forward API requests to - scriptExecutorConfig query.QueryConfig - logTxTimeToFinalized bool - logTxTimeToExecuted bool - logTxTimeToFinalizedExecuted bool - executionDataSyncEnabled bool - executionDataIndexingEnabled bool - localServiceAPIEnabled bool - executionDataDir string - executionDataStartHeight uint64 - executionDataConfig edrequester.ExecutionDataConfig - scriptExecMinBlock uint64 - scriptExecMaxBlock uint64 - registerCacheType string - registerCacheSize uint - programCacheSize uint + bootstrapNodeAddresses []string + bootstrapNodePublicKeys []string + observerNetworkingKeyPath string + bootstrapIdentities flow.IdentitySkeletonList // the identity list of bootstrap peers the node uses to discover other nodes + apiRatelimits map[string]int + apiBurstlimits map[string]int + rpcConf rpc.Config + rpcMetricsEnabled bool + registersDBPath string + checkpointFile string + apiTimeout time.Duration + stateStreamConf statestreambackend.Config + stateStreamFilterConf map[string]int + upstreamNodeAddresses []string + upstreamNodePublicKeys []string + upstreamIdentities flow.IdentitySkeletonList // the identity list of upstream peers the node uses to forward API requests to + scriptExecutorConfig query.QueryConfig + logTxTimeToFinalized bool + logTxTimeToExecuted bool + logTxTimeToFinalizedExecuted bool + executionDataSyncEnabled bool + executionDataIndexingEnabled bool + executionDataPrunerHeightRangeTarget uint64 + executionDataPrunerThreshold uint64 + localServiceAPIEnabled bool + executionDataDir string + executionDataStartHeight uint64 + executionDataConfig edrequester.ExecutionDataConfig + scriptExecMinBlock uint64 + scriptExecMaxBlock uint64 + registerCacheType string + registerCacheSize uint + programCacheSize uint } // DefaultObserverServiceConfig defines all the default values for the ObserverServiceConfig @@ -201,27 +206,29 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig { HeartbeatInterval: subscription.DefaultHeartbeatInterval, RegisterIDsRequestLimit: state_stream.DefaultRegisterIDsRequestLimit, }, - stateStreamFilterConf: nil, - rpcMetricsEnabled: false, - apiRatelimits: nil, - apiBurstlimits: nil, - bootstrapNodeAddresses: []string{}, - bootstrapNodePublicKeys: []string{}, - observerNetworkingKeyPath: cmd.NotSet, - apiTimeout: 3 * time.Second, - upstreamNodeAddresses: []string{}, - upstreamNodePublicKeys: []string{}, - registersDBPath: filepath.Join(homedir, ".flow", "execution_state"), - checkpointFile: cmd.NotSet, - scriptExecutorConfig: query.NewDefaultConfig(), - logTxTimeToFinalized: false, - logTxTimeToExecuted: false, - logTxTimeToFinalizedExecuted: false, - executionDataSyncEnabled: false, - executionDataIndexingEnabled: false, - localServiceAPIEnabled: false, - executionDataDir: filepath.Join(homedir, ".flow", "execution_data"), - executionDataStartHeight: 0, + stateStreamFilterConf: nil, + rpcMetricsEnabled: false, + apiRatelimits: nil, + apiBurstlimits: nil, + bootstrapNodeAddresses: []string{}, + bootstrapNodePublicKeys: []string{}, + observerNetworkingKeyPath: cmd.NotSet, + apiTimeout: 3 * time.Second, + upstreamNodeAddresses: []string{}, + upstreamNodePublicKeys: []string{}, + registersDBPath: filepath.Join(homedir, ".flow", "execution_state"), + checkpointFile: cmd.NotSet, + scriptExecutorConfig: query.NewDefaultConfig(), + logTxTimeToFinalized: false, + logTxTimeToExecuted: false, + logTxTimeToFinalizedExecuted: false, + executionDataSyncEnabled: false, + executionDataIndexingEnabled: false, + executionDataPrunerHeightRangeTarget: 0, + executionDataPrunerThreshold: 100_000, + localServiceAPIEnabled: false, + executionDataDir: filepath.Join(homedir, ".flow", "execution_data"), + executionDataStartHeight: 0, executionDataConfig: edrequester.ExecutionDataConfig{ InitialBlockHeight: 0, MaxSearchAhead: edrequester.DefaultMaxSearchAhead, @@ -264,6 +271,10 @@ type ObserverServiceBuilder struct { ExecutionDataDownloader execution_data.Downloader ExecutionDataRequester state_synchronization.ExecutionDataRequester ExecutionDataStore execution_data.ExecutionDataStore + ExecutionDataBlobstore blobs.Blobstore + ExecutionDataPruner *pruner.Pruner + ExecutionDataDatastore *badger.Datastore + ExecutionDataTracker tracker.Storage RegistersAsyncStore *execution.RegistersAsyncStore Reporter *index.Reporter @@ -657,6 +668,16 @@ func (builder *ObserverServiceBuilder) extraFlags() { flags.StringVar(&builder.registersDBPath, "execution-state-dir", defaultConfig.registersDBPath, "directory to use for execution-state database") flags.StringVar(&builder.checkpointFile, "execution-state-checkpoint", defaultConfig.checkpointFile, "execution-state checkpoint file") + // Execution data pruner + flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, + "execution-data-height-range-target", + defaultConfig.executionDataPrunerHeightRangeTarget, + "number of blocks of Execution Data to keep on disk. older data is pruned") + flags.Uint64Var(&builder.executionDataPrunerThreshold, + "execution-data-height-range-threshold", + defaultConfig.executionDataPrunerThreshold, + "number of unpruned blocks of Execution Data beyond the height range target to allow before pruning") + // ExecutionDataRequester config flags.BoolVar(&builder.executionDataSyncEnabled, "execution-data-sync-enabled", @@ -1059,7 +1080,6 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) { } func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder { - var ds *badger.Datastore var bs network.BlobService var processedBlockHeight storage.ConsumerProgress var processedNotifications storage.ConsumerProgress @@ -1083,13 +1103,13 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS return err } - ds, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions) + builder.ExecutionDataDatastore, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions) if err != nil { return err } builder.ShutdownFunc(func() error { - if err := ds.Close(); err != nil { + if err := builder.ExecutionDataDatastore.Close(); err != nil { return fmt.Errorf("could not close execution data datastore: %w", err) } return nil @@ -1100,13 +1120,13 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS Module("processed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. - processedBlockHeight = bstorage.NewConsumerProgress(ds.DB, module.ConsumeProgressExecutionDataRequesterBlockHeight) + processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterBlockHeight) return nil }). Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. - processedNotifications = bstorage.NewConsumerProgress(ds.DB, module.ConsumeProgressExecutionDataRequesterNotification) + processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterNotification) return nil }). Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error { @@ -1115,8 +1135,8 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS return nil }). Module("execution datastore", func(node *cmd.NodeConfig) error { - blobstore := blobs.NewBlobstore(ds) - builder.ExecutionDataStore = execution_data.NewExecutionDataStore(blobstore, execution_data.DefaultSerializer) + builder.ExecutionDataBlobstore = blobs.NewBlobstore(builder.ExecutionDataDatastore) + builder.ExecutionDataStore = execution_data.NewExecutionDataStore(builder.ExecutionDataBlobstore, execution_data.DefaultSerializer) return nil }). Module("execution data cache", func(node *cmd.NodeConfig) error { @@ -1150,7 +1170,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS } var err error - bs, err = node.EngineRegistry.RegisterBlobService(channels.PublicExecutionDataService, ds, opts...) + bs, err = node.EngineRegistry.RegisterBlobService(channels.PublicExecutionDataService, builder.ExecutionDataDatastore, opts...) if err != nil { return nil, fmt.Errorf("could not register blob service: %w", err) } @@ -1160,7 +1180,34 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS // to be ready before starting publicBsDependable.Init(bs) - builder.ExecutionDataDownloader = execution_data.NewDownloader(bs) + var downloaderOpts []execution_data.DownloaderOption + + if builder.executionDataPrunerHeightRangeTarget != 0 { + sealed, err := node.State.Sealed().Head() + if err != nil { + return nil, fmt.Errorf("cannot get the sealed block: %w", err) + } + + trackerDir := filepath.Join(builder.executionDataDir, "tracker") + builder.ExecutionDataTracker, err = tracker.OpenStorage( + trackerDir, + sealed.Height, + node.Logger, + tracker.WithPruneCallback(func(c cid.Cid) error { + // TODO: use a proper context here + return builder.ExecutionDataBlobstore.DeleteBlob(context.TODO(), c) + }), + ) + if err != nil { + return nil, fmt.Errorf("failed to create execution data tracker: %w", err) + } + + downloaderOpts = []execution_data.DownloaderOption{ + execution_data.WithExecutionDataTracker(builder.ExecutionDataTracker, node.Storage.Headers), + } + } + + builder.ExecutionDataDownloader = execution_data.NewDownloader(bs, downloaderOpts...) return builder.ExecutionDataDownloader, nil }). @@ -1232,8 +1279,50 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS requesterDependable.Init(builder.ExecutionDataRequester) return builder.ExecutionDataRequester, nil - }) + }). + Component("execution data pruner", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { + if builder.executionDataPrunerHeightRangeTarget == 0 { + return &module.NoopReadyDoneAware{}, nil + } + execDataDistributor.AddOnExecutionDataReceivedConsumer(func(data *execution_data.BlockExecutionDataEntity) { + header, err := node.Storage.Headers.ByBlockID(data.BlockID) + if err != nil { + node.Logger.Fatal().Err(err).Msg("failed to get header for execution data") + } + + if builder.ExecutionDataPruner != nil { + err = builder.ExecutionDataTracker.SetFulfilledHeight(header.Height) + if err != nil { + node.Logger.Fatal().Err(err).Msg("failed to set fulfilled height") + } + + builder.ExecutionDataPruner.NotifyFulfilledHeight(header.Height) + } + }) + + var prunerMetrics module.ExecutionDataPrunerMetrics = metrics.NewNoopCollector() + if node.MetricsEnabled { + prunerMetrics = metrics.NewExecutionDataPrunerCollector() + } + + var err error + builder.ExecutionDataPruner, err = pruner.NewPruner( + node.Logger, + prunerMetrics, + builder.ExecutionDataTracker, + pruner.WithPruneCallback(func(ctx context.Context) error { + return builder.ExecutionDataDatastore.CollectGarbage(ctx) + }), + pruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget), + pruner.WithThreshold(builder.executionDataPrunerThreshold), + ) + if err != nil { + return nil, fmt.Errorf("failed to create execution data pruner: %w", err) + } + + return builder.ExecutionDataPruner, nil + }) if builder.executionDataIndexingEnabled { var indexedBlockHeight storage.ConsumerProgress diff --git a/module/executiondatasync/execution_data/downloader.go b/module/executiondatasync/execution_data/downloader.go index 71905342c33..8ae1a000d1c 100644 --- a/module/executiondatasync/execution_data/downloader.go +++ b/module/executiondatasync/execution_data/downloader.go @@ -12,7 +12,9 @@ import ( "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/module/blobs" + "github.com/onflow/flow-go/module/executiondatasync/tracker" "github.com/onflow/flow-go/network" + "github.com/onflow/flow-go/storage" ) // Downloader is used to download execution data blobs from the network via a blob service. @@ -27,6 +29,8 @@ type downloader struct { blobService network.BlobService maxBlobSize int serializer Serializer + storage tracker.Storage + headers storage.Headers } type DownloaderOption func(*downloader) @@ -38,12 +42,20 @@ func WithSerializer(serializer Serializer) DownloaderOption { } } +// WithExecutionDataTracker configures the execution data tracker and the storage headers for the downloader +func WithExecutionDataTracker(storage tracker.Storage, headers storage.Headers) DownloaderOption { + return func(d *downloader) { + d.storage = storage + d.headers = headers + } +} + // NewDownloader creates a new Downloader instance func NewDownloader(blobService network.BlobService, opts ...DownloaderOption) *downloader { d := &downloader{ - blobService, - DefaultMaxBlobSize, - DefaultSerializer, + blobService: blobService, + maxBlobSize: DefaultMaxBlobSize, + serializer: DefaultSerializer, } for _, opt := range opts { @@ -83,12 +95,15 @@ func (d *downloader) Get(ctx context.Context, executionDataID flow.Identifier) ( // Next, download each of the chunk execution data blobs chunkExecutionDatas := make([]*ChunkExecutionData, len(edRoot.ChunkExecutionDataIDs)) + // Execution data cids + var edCids = []cid.Cid{flow.IdToCid(executionDataID)} + for i, chunkDataID := range edRoot.ChunkExecutionDataIDs { i := i chunkDataID := chunkDataID g.Go(func() error { - ced, err := d.getChunkExecutionData( + ced, cids, err := d.getChunkExecutionData( gCtx, chunkDataID, blobGetter, @@ -99,6 +114,7 @@ func (d *downloader) Get(ctx context.Context, executionDataID flow.Identifier) ( } chunkExecutionDatas[i] = ced + edCids = append(edCids, cids...) return nil }) @@ -108,6 +124,11 @@ func (d *downloader) Get(ctx context.Context, executionDataID flow.Identifier) ( return nil, err } + err = d.trackBlobs(edRoot.BlockID, edCids) + if err != nil { + return nil, fmt.Errorf("failed to track blob: %w", err) + } + // Finally, recombine data into original record. bed := &BlockExecutionData{ BlockID: edRoot.BlockID, @@ -160,7 +181,7 @@ func (d *downloader) getExecutionDataRoot( } // getChunkExecutionData downloads a chunk execution data blob from the network and returns the -// deserialized ChunkExecutionData struct. +// deserialized ChunkExecutionData struct with list of cids from all levels of the blob tree. // // Expected errors during normal operations: // - context.Canceled or context.DeadlineExceeded if the context is canceled or times out @@ -171,28 +192,59 @@ func (d *downloader) getChunkExecutionData( ctx context.Context, chunkExecutionDataID cid.Cid, blobGetter network.BlobGetter, -) (*ChunkExecutionData, error) { +) (*ChunkExecutionData, []cid.Cid, error) { cids := []cid.Cid{chunkExecutionDataID} + cidsFromAllLevels := []cid.Cid{chunkExecutionDataID} // iteratively process each level of the blob tree until a ChunkExecutionData is returned or an // error is encountered for i := 0; ; i++ { v, err := d.getBlobs(ctx, blobGetter, cids) if err != nil { - return nil, fmt.Errorf("failed to get level %d of blob tree: %w", i, err) + return nil, nil, fmt.Errorf("failed to get level %d of blob tree: %w", i, err) } switch v := v.(type) { case *ChunkExecutionData: - return v, nil + return v, cidsFromAllLevels, nil case *[]cid.Cid: + cidsFromAllLevels = append(cidsFromAllLevels, *v...) cids = *v default: - return nil, NewMalformedDataError(fmt.Errorf("blob tree contains unexpected type %T at level %d", v, i)) + return nil, nil, NewMalformedDataError(fmt.Errorf("blob tree contains unexpected type %T at level %d", v, i)) } } } +// trackBlobs updates the storage to track the provided CIDs for a given block. +// This is used to ensure that the blobs can be pruned later. +// +// Parameters: +// - blockID: The identifier of the block to which the blobs belong. +// - cids: CIDs to be tracked. +// +// No errors are expected during normal operations. +func (d *downloader) trackBlobs(blockID flow.Identifier, cids []cid.Cid) error { + if d.storage == nil || d.headers == nil { + return nil + } + + return d.storage.Update(func(trackBlobs tracker.TrackBlobsFn) error { + header, err := d.headers.ByBlockID(blockID) + if err != nil { + return err + } + + // track new blobs so that they can be pruned later + err = trackBlobs(header.Height, cids...) + if err != nil { + return err + } + + return nil + }) +} + // getBlobs gets the given CIDs from the blobservice, reassembles the blobs, and deserializes the reassembled data into an object. // // Expected errors during normal operations: diff --git a/module/executiondatasync/tracker/storage.go b/module/executiondatasync/tracker/storage.go index 4c47ccad5ca..c7677f79ca7 100644 --- a/module/executiondatasync/tracker/storage.go +++ b/module/executiondatasync/tracker/storage.go @@ -52,13 +52,13 @@ const blobRecordKeyLength = 1 + 8 + blobs.CidLength func makeBlobRecordKey(blockHeight uint64, c cid.Cid) []byte { blobRecordKey := make([]byte, blobRecordKeyLength) blobRecordKey[0] = prefixBlobRecord - binary.LittleEndian.PutUint64(blobRecordKey[1:], blockHeight) + binary.BigEndian.PutUint64(blobRecordKey[1:], blockHeight) copy(blobRecordKey[1+8:], c.Bytes()) return blobRecordKey } func parseBlobRecordKey(key []byte) (uint64, cid.Cid, error) { - blockHeight := binary.LittleEndian.Uint64(key[1:]) + blockHeight := binary.BigEndian.Uint64(key[1:]) c, err := cid.Cast(key[1+8:]) return blockHeight, c, err } @@ -74,7 +74,7 @@ func makeLatestHeightKey(c cid.Cid) []byte { func makeUint64Value(v uint64) []byte { value := make([]byte, 8) - binary.LittleEndian.PutUint64(value, v) + binary.BigEndian.PutUint64(value, v) return value } @@ -84,7 +84,7 @@ func getUint64Value(item *badger.Item) (uint64, error) { return 0, err } - return binary.LittleEndian.Uint64(value), nil + return binary.BigEndian.Uint64(value), nil } // getBatchItemCountLimit returns the maximum number of items that can be included in a single batch diff --git a/module/executiondatasync/tracker/storage_test.go b/module/executiondatasync/tracker/storage_test.go index b0078065642..76c7b613ab6 100644 --- a/module/executiondatasync/tracker/storage_test.go +++ b/module/executiondatasync/tracker/storage_test.go @@ -120,3 +120,66 @@ func TestPruneNonLatestHeight(t *testing.T) { }) require.NoError(t, err) } + +// TestAscendingOrderOfRecords tests that order of data is ascending and all CIDs appearing at or below the pruned +// height, and their associated tracking data, should be removed from the database. +func TestAscendingOrderOfRecords(t *testing.T) { + expectedPrunedCIDs := make(map[cid.Cid]struct{}) + storageDir := t.TempDir() + storage, err := OpenStorage(storageDir, 0, zerolog.Nop(), WithPruneCallback(func(c cid.Cid) error { + _, ok := expectedPrunedCIDs[c] + assert.True(t, ok, "unexpected CID pruned: %s", c.String()) + delete(expectedPrunedCIDs, c) + return nil + })) + require.NoError(t, err) + + // c1 is for height 1, + // c2 is for height 2, + // c3 is for height 256 + // pruning up to height 1 will check if order of the records is ascending, c1 should be pruned + c1 := randomCid() + expectedPrunedCIDs[c1] = struct{}{} + c2 := randomCid() + c3 := randomCid() + + require.NoError(t, storage.Update(func(tbf TrackBlobsFn) error { + require.NoError(t, tbf(1, c1)) + require.NoError(t, tbf(2, c2)) + // It is important to check if the record with height 256 does not precede + // the record with height 1 during pruning. + require.NoError(t, tbf(256, c3)) + + return nil + })) + require.NoError(t, storage.PruneUpToHeight(1)) + + prunedHeight, err := storage.GetPrunedHeight() + require.NoError(t, err) + assert.Equal(t, uint64(1), prunedHeight) + + assert.Len(t, expectedPrunedCIDs, 0) + + err = storage.db.View(func(txn *badger.Txn) error { + // expected that blob record with height 1 was removed + _, err := txn.Get(makeBlobRecordKey(1, c1)) + assert.ErrorIs(t, err, badger.ErrKeyNotFound) + _, err = txn.Get(makeLatestHeightKey(c1)) + assert.ErrorIs(t, err, badger.ErrKeyNotFound) + + // expected that blob record with height 2 exists + _, err = txn.Get(makeBlobRecordKey(2, c2)) + assert.NoError(t, err) + _, err = txn.Get(makeLatestHeightKey(c2)) + assert.NoError(t, err) + + // expected that blob record with height 256 exists + _, err = txn.Get(makeBlobRecordKey(256, c3)) + assert.NoError(t, err) + _, err = txn.Get(makeLatestHeightKey(c3)) + assert.NoError(t, err) + + return nil + }) + require.NoError(t, err) +}