Skip to content

Commit

Permalink
Merge pull request onflow#6109 from The-K-R-O-K/UlyanaAndrukhiv/6002-…
Browse files Browse the repository at this point in the history
…exec-data-pruning

[Access] Test Execution Data db pruning functionality on Access/Observer nodes
  • Loading branch information
peterargue authored Jul 5, 2024
2 parents b292e24 + 55fb3fa commit bfb4f89
Show file tree
Hide file tree
Showing 5 changed files with 420 additions and 129 deletions.
195 changes: 141 additions & 54 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/go-cid"
badger "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
Expand Down Expand Up @@ -65,6 +66,8 @@ import (
"github.com/onflow/flow-go/module/execution"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
"github.com/onflow/flow-go/module/executiondatasync/pruner"
"github.com/onflow/flow-go/module/executiondatasync/tracker"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/grpcserver"
"github.com/onflow/flow-go/module/id"
Expand Down Expand Up @@ -124,40 +127,42 @@ import (
// For a node running as a standalone process, the config fields will be populated from the command line params,
// while for a node running as a library, the config fields are expected to be initialized by the caller.
type AccessNodeConfig struct {
supportsObserver bool // True if this is an Access node that supports observers and consensus follower engines
collectionGRPCPort uint
executionGRPCPort uint
pingEnabled bool
nodeInfoFile string
apiRatelimits map[string]int
apiBurstlimits map[string]int
rpcConf rpc.Config
stateStreamConf statestreambackend.Config
stateStreamFilterConf map[string]int
ExecutionNodeAddress string // deprecated
HistoricalAccessRPCs []access.AccessAPIClient
logTxTimeToFinalized bool
logTxTimeToExecuted bool
logTxTimeToFinalizedExecuted bool
retryEnabled bool
rpcMetricsEnabled bool
executionDataSyncEnabled bool
publicNetworkExecutionDataEnabled bool
executionDataDir string
executionDataStartHeight uint64
executionDataConfig edrequester.ExecutionDataConfig
PublicNetworkConfig PublicNetworkConfig
TxResultCacheSize uint
TxErrorMessagesCacheSize uint
executionDataIndexingEnabled bool
registersDBPath string
checkpointFile string
scriptExecutorConfig query.QueryConfig
scriptExecMinBlock uint64
scriptExecMaxBlock uint64
registerCacheType string
registerCacheSize uint
programCacheSize uint
supportsObserver bool // True if this is an Access node that supports observers and consensus follower engines
collectionGRPCPort uint
executionGRPCPort uint
pingEnabled bool
nodeInfoFile string
apiRatelimits map[string]int
apiBurstlimits map[string]int
rpcConf rpc.Config
stateStreamConf statestreambackend.Config
stateStreamFilterConf map[string]int
ExecutionNodeAddress string // deprecated
HistoricalAccessRPCs []access.AccessAPIClient
logTxTimeToFinalized bool
logTxTimeToExecuted bool
logTxTimeToFinalizedExecuted bool
retryEnabled bool
rpcMetricsEnabled bool
executionDataSyncEnabled bool
publicNetworkExecutionDataEnabled bool
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
executionDataDir string
executionDataStartHeight uint64
executionDataConfig edrequester.ExecutionDataConfig
PublicNetworkConfig PublicNetworkConfig
TxResultCacheSize uint
TxErrorMessagesCacheSize uint
executionDataIndexingEnabled bool
registersDBPath string
checkpointFile string
scriptExecutorConfig query.QueryConfig
scriptExecMinBlock uint64
scriptExecMaxBlock uint64
registerCacheType string
registerCacheSize uint
programCacheSize uint
}

type PublicNetworkConfig struct {
Expand Down Expand Up @@ -246,15 +251,17 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
RetryDelay: edrequester.DefaultRetryDelay,
MaxRetryDelay: edrequester.DefaultMaxRetryDelay,
},
executionDataIndexingEnabled: false,
registersDBPath: filepath.Join(homedir, ".flow", "execution_state"),
checkpointFile: cmd.NotSet,
scriptExecutorConfig: query.NewDefaultConfig(),
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pStorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
executionDataIndexingEnabled: false,
executionDataPrunerHeightRangeTarget: 0,
executionDataPrunerThreshold: 100_000,
registersDBPath: filepath.Join(homedir, ".flow", "execution_state"),
checkpointFile: cmd.NotSet,
scriptExecutorConfig: query.NewDefaultConfig(),
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pStorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
}
}

Expand Down Expand Up @@ -288,6 +295,7 @@ type FlowAccessNodeBuilder struct {
PublicBlobService network.BlobService
ExecutionDataRequester state_synchronization.ExecutionDataRequester
ExecutionDataStore execution_data.ExecutionDataStore
ExecutionDataBlobstore blobs.Blobstore
ExecutionDataCache *execdatacache.ExecutionDataCache
ExecutionIndexer *indexer.Indexer
ExecutionIndexerCore *indexer.IndexerCore
Expand All @@ -298,6 +306,9 @@ type FlowAccessNodeBuilder struct {
TxResultsIndex *index.TransactionResultsIndex
IndexerDependencies *cmd.DependencyList
collectionExecutedMetric module.CollectionExecutedMetric
ExecutionDataPruner *pruner.Pruner
ExecutionDataDatastore *badger.Datastore
ExecutionDataTracker tracker.Storage

// The sync engine participants provider is the libp2p peer store for the access node
// which is not available until after the network has started.
Expand Down Expand Up @@ -502,7 +513,6 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu
}

func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder {
var ds *badger.Datastore
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
Expand All @@ -526,13 +536,12 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return err
}

ds, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions)
builder.ExecutionDataDatastore, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions)
if err != nil {
return err
}

builder.ShutdownFunc(func() error {
if err := ds.Close(); err != nil {
if err := builder.ExecutionDataDatastore.Close(); err != nil {
return fmt.Errorf("could not close execution data datastore: %w", err)
}
return nil
Expand All @@ -543,13 +552,13 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
processedBlockHeight = bstorage.NewConsumerProgress(ds.DB, module.ConsumeProgressExecutionDataRequesterBlockHeight)
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterBlockHeight)
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
processedNotifications = bstorage.NewConsumerProgress(ds.DB, module.ConsumeProgressExecutionDataRequesterNotification)
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterNotification)
return nil
}).
Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
Expand All @@ -558,8 +567,8 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil
}).
Module("execution datastore", func(node *cmd.NodeConfig) error {
blobstore := blobs.NewBlobstore(ds)
builder.ExecutionDataStore = execution_data.NewExecutionDataStore(blobstore, execution_data.DefaultSerializer)
builder.ExecutionDataBlobstore = blobs.NewBlobstore(builder.ExecutionDataDatastore)
builder.ExecutionDataStore = execution_data.NewExecutionDataStore(builder.ExecutionDataBlobstore, execution_data.DefaultSerializer)
return nil
}).
Module("execution data cache", func(node *cmd.NodeConfig) error {
Expand Down Expand Up @@ -601,7 +610,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}

var err error
bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, ds, opts...)
bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, builder.ExecutionDataDatastore, opts...)
if err != nil {
return nil, fmt.Errorf("could not register blob service: %w", err)
}
Expand All @@ -611,8 +620,34 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
// to be ready before starting
bsDependable.Init(bs)

builder.ExecutionDataDownloader = execution_data.NewDownloader(bs)
var downloaderOpts []execution_data.DownloaderOption

if builder.executionDataPrunerHeightRangeTarget != 0 {
sealed, err := node.State.Sealed().Head()
if err != nil {
return nil, fmt.Errorf("cannot get the sealed block: %w", err)
}

trackerDir := filepath.Join(builder.executionDataDir, "tracker")
builder.ExecutionDataTracker, err = tracker.OpenStorage(
trackerDir,
sealed.Height,
node.Logger,
tracker.WithPruneCallback(func(c cid.Cid) error {
// TODO: use a proper context here
return builder.ExecutionDataBlobstore.DeleteBlob(context.TODO(), c)
}),
)
if err != nil {
return nil, fmt.Errorf("failed to create execution data tracker: %w", err)
}

downloaderOpts = []execution_data.DownloaderOption{
execution_data.WithExecutionDataTracker(builder.ExecutionDataTracker, node.Storage.Headers),
}
}

builder.ExecutionDataDownloader = execution_data.NewDownloader(bs, downloaderOpts...)
return builder.ExecutionDataDownloader, nil
}).
Component("execution data requester", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
Expand Down Expand Up @@ -683,6 +718,50 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
requesterDependable.Init(builder.ExecutionDataRequester)

return builder.ExecutionDataRequester, nil
}).
Component("execution data pruner", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
if builder.executionDataPrunerHeightRangeTarget == 0 {
return &module.NoopReadyDoneAware{}, nil
}

execDataDistributor.AddOnExecutionDataReceivedConsumer(func(data *execution_data.BlockExecutionDataEntity) {
header, err := node.Storage.Headers.ByBlockID(data.BlockID)
if err != nil {
// if the execution data is available, the block must be locally finalized
node.Logger.Fatal().Err(err).Msg("failed to get header for execution data")
}

if builder.ExecutionDataPruner != nil {
err = builder.ExecutionDataTracker.SetFulfilledHeight(header.Height)
if err != nil {
node.Logger.Fatal().Err(err).Msg("failed to set fulfilled height")
}

builder.ExecutionDataPruner.NotifyFulfilledHeight(header.Height)
}
})

var prunerMetrics module.ExecutionDataPrunerMetrics = metrics.NewNoopCollector()
if node.MetricsEnabled {
prunerMetrics = metrics.NewExecutionDataPrunerCollector()
}

var err error
builder.ExecutionDataPruner, err = pruner.NewPruner(
node.Logger,
prunerMetrics,
builder.ExecutionDataTracker,
pruner.WithPruneCallback(func(ctx context.Context) error {
return builder.ExecutionDataDatastore.CollectGarbage(ctx)
}),
pruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget),
pruner.WithThreshold(builder.executionDataPrunerThreshold),
)
if err != nil {
return nil, fmt.Errorf("failed to create execution data pruner: %w", err)
}

return builder.ExecutionDataPruner, nil
})

if builder.publicNetworkExecutionDataEnabled {
Expand All @@ -706,7 +785,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
net := builder.AccessNodeConfig.PublicNetworkConfig.Network

var err error
builder.PublicBlobService, err = net.RegisterBlobService(channels.PublicExecutionDataService, ds, opts...)
builder.PublicBlobService, err = net.RegisterBlobService(channels.PublicExecutionDataService, builder.ExecutionDataDatastore, opts...)
if err != nil {
return nil, fmt.Errorf("could not register blob service: %w", err)
}
Expand Down Expand Up @@ -1179,6 +1258,14 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"execution-data-max-retry-delay",
defaultConfig.executionDataConfig.MaxRetryDelay,
"maximum delay for exponential backoff when fetching execution data fails e.g. 5m")
flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget,
"execution-data-height-range-target",
defaultConfig.executionDataPrunerHeightRangeTarget,
"number of blocks of Execution Data to keep on disk. older data is pruned")
flags.Uint64Var(&builder.executionDataPrunerThreshold,
"execution-data-height-range-threshold",
defaultConfig.executionDataPrunerThreshold,
"number of unpruned blocks of Execution Data beyond the height range target to allow before pruning")

// Execution State Streaming API
flags.Uint32Var(&builder.stateStreamConf.ExecutionDataCacheSize, "execution-data-cache-size", defaultConfig.stateStreamConf.ExecutionDataCacheSize, "block execution data cache size")
Expand Down
Loading

0 comments on commit bfb4f89

Please sign in to comment.