Skip to content

Commit

Permalink
Merge pull request #6180 from The-K-R-O-K/UlyanaAndrukhiv/6017-pebble…
Browse files Browse the repository at this point in the history
…-as-execution-datastore-db

[State Sync] Experiment with using pebble as the execution datastore db
  • Loading branch information
peterargue authored Jul 31, 2024
2 parents 60e14fd + 406a64a commit d719f5f
Show file tree
Hide file tree
Showing 20 changed files with 548 additions and 61 deletions.
72 changes: 53 additions & 19 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ import (
"strings"
"time"

"github.com/cockroachdb/pebble"
"github.com/dgraph-io/badger/v2"
"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/go-cid"
badger "github.com/ipfs/go-ds-badger2"
"github.com/ipfs/go-datastore"
badgerds "github.com/ipfs/go-ds-badger2"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/onflow/crypto"
Expand Down Expand Up @@ -69,6 +72,7 @@ import (
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
"github.com/onflow/flow-go/module/executiondatasync/pruner"
edstorage "github.com/onflow/flow-go/module/executiondatasync/storage"
"github.com/onflow/flow-go/module/executiondatasync/tracker"
finalizer "github.com/onflow/flow-go/module/finalizer/consensus"
"github.com/onflow/flow-go/module/grpcserver"
Expand Down Expand Up @@ -106,7 +110,7 @@ import (
"github.com/onflow/flow-go/state/protocol/blocktimer"
"github.com/onflow/flow-go/storage"
bstorage "github.com/onflow/flow-go/storage/badger"
pStorage "github.com/onflow/flow-go/storage/pebble"
pstorage "github.com/onflow/flow-go/storage/pebble"
"github.com/onflow/flow-go/utils/grpcutils"
)

Expand Down Expand Up @@ -148,6 +152,7 @@ type AccessNodeConfig struct {
rpcMetricsEnabled bool
executionDataSyncEnabled bool
publicNetworkExecutionDataEnabled bool
executionDataDBMode string
executionDataPrunerHeightRangeTarget uint64
executionDataPrunerThreshold uint64
executionDataPruningInterval time.Duration
Expand Down Expand Up @@ -257,6 +262,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
MaxRetryDelay: edrequester.DefaultMaxRetryDelay,
},
executionDataIndexingEnabled: false,
executionDataDBMode: execution_data.ExecutionDataDBModeBadger.String(),
executionDataPrunerHeightRangeTarget: 0,
executionDataPrunerThreshold: pruner.DefaultThreshold,
executionDataPruningInterval: pruner.DefaultPruningInterval,
Expand All @@ -265,7 +271,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
scriptExecutorConfig: query.NewDefaultConfig(),
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pStorage.CacheTypeTwoQueue.String(),
registerCacheType: pstorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
checkPayerBalance: false,
Expand Down Expand Up @@ -315,7 +321,7 @@ type FlowAccessNodeBuilder struct {
IndexerDependencies *cmd.DependencyList
collectionExecutedMetric module.CollectionExecutedMetric
ExecutionDataPruner *pruner.Pruner
ExecutionDataDatastore *badger.Datastore
ExecutionDatastoreManager edstorage.DatastoreManager
ExecutionDataTracker tracker.Storage
versionControl *version.VersionControl

Expand Down Expand Up @@ -522,13 +528,15 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu
}

func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder {
var ds datastore.Batching
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
var bsDependable *module.ProxiedReadyDoneAware
var execDataDistributor *edrequester.ExecutionDataDistributor
var execDataCacheBackend *herocache.BlockExecutionData
var executionDataStoreCache *execdatacache.ExecutionDataCache
var executionDataDBMode execution_data.ExecutionDataDBMode

// setup dependency chain to ensure indexer starts after the requester
requesterDependable := module.NewProxiedReadyDoneAware()
Expand All @@ -547,12 +555,26 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return err
}

builder.ExecutionDataDatastore, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions)
executionDataDBMode, err = execution_data.ParseExecutionDataDBMode(builder.executionDataDBMode)
if err != nil {
return err
return fmt.Errorf("could not parse execution data DB mode: %w", err)
}

if executionDataDBMode == execution_data.ExecutionDataDBModePebble {
builder.ExecutionDatastoreManager, err = edstorage.NewPebbleDatastoreManager(datastoreDir, nil)
if err != nil {
return fmt.Errorf("could not create PebbleDatastoreManager for execution data: %w", err)
}
} else {
builder.ExecutionDatastoreManager, err = edstorage.NewBadgerDatastoreManager(datastoreDir, &badgerds.DefaultOptions)
if err != nil {
return fmt.Errorf("could not create BadgerDatastoreManager for execution data: %w", err)
}
}
ds = builder.ExecutionDatastoreManager.Datastore()

builder.ShutdownFunc(func() error {
if err := builder.ExecutionDataDatastore.Close(); err != nil {
if err := builder.ExecutionDatastoreManager.Close(); err != nil {
return fmt.Errorf("could not close execution data datastore: %w", err)
}
return nil
Expand All @@ -563,13 +585,21 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
Module("processed block height consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterBlockHeight)
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
} else {
processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight)
}
return nil
}).
Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error {
// Note: progress is stored in the datastore's DB since that is where the jobqueue
// writes execution data to.
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterNotification)
if executionDataDBMode == execution_data.ExecutionDataDBModeBadger {
processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification)
} else {
processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification)
}
return nil
}).
Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error {
Expand All @@ -578,7 +608,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return nil
}).
Module("execution datastore", func(node *cmd.NodeConfig) error {
builder.ExecutionDataBlobstore = blobs.NewBlobstore(builder.ExecutionDataDatastore)
builder.ExecutionDataBlobstore = blobs.NewBlobstore(ds)
builder.ExecutionDataStore = execution_data.NewExecutionDataStore(builder.ExecutionDataBlobstore, execution_data.DefaultSerializer)
return nil
}).
Expand Down Expand Up @@ -621,7 +651,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}

var err error
bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, builder.ExecutionDataDatastore, opts...)
bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, ds, opts...)
if err != nil {
return nil, fmt.Errorf("could not register blob service: %w", err)
}
Expand Down Expand Up @@ -746,7 +776,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
prunerMetrics,
builder.ExecutionDataTracker,
pruner.WithPruneCallback(func(ctx context.Context) error {
return builder.ExecutionDataDatastore.CollectGarbage(ctx)
return builder.ExecutionDatastoreManager.CollectGarbage(ctx)
}),
pruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget),
pruner.WithThreshold(builder.executionDataPrunerThreshold),
Expand Down Expand Up @@ -782,7 +812,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
net := builder.AccessNodeConfig.PublicNetworkConfig.Network

var err error
builder.PublicBlobService, err = net.RegisterBlobService(channels.PublicExecutionDataService, builder.ExecutionDataDatastore, opts...)
builder.PublicBlobService, err = net.RegisterBlobService(channels.PublicExecutionDataService, ds, opts...)
if err != nil {
return nil, fmt.Errorf("could not register blob service: %w", err)
}
Expand Down Expand Up @@ -816,15 +846,15 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
// other components from starting while bootstrapping the register db since it may
// take hours to complete.

pdb, err := pStorage.OpenRegisterPebbleDB(builder.registersDBPath)
pdb, err := pstorage.OpenRegisterPebbleDB(builder.registersDBPath)
if err != nil {
return nil, fmt.Errorf("could not open registers db: %w", err)
}
builder.ShutdownFunc(func() error {
return pdb.Close()
})

bootstrapped, err := pStorage.IsBootstrapped(pdb)
bootstrapped, err := pstorage.IsBootstrapped(pdb)
if err != nil {
return nil, fmt.Errorf("could not check if registers db is bootstrapped: %w", err)
}
Expand Down Expand Up @@ -856,7 +886,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}

rootHash := ledger.RootHash(builder.RootSeal.FinalState)
bootstrap, err := pStorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger)
bootstrap, err := pstorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger)
if err != nil {
return nil, fmt.Errorf("could not create registers bootstrap: %w", err)
}
Expand All @@ -869,18 +899,18 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}
}

registers, err := pStorage.NewRegisters(pdb)
registers, err := pstorage.NewRegisters(pdb)
if err != nil {
return nil, fmt.Errorf("could not create registers storage: %w", err)
}

if builder.registerCacheSize > 0 {
cacheType, err := pStorage.ParseCacheType(builder.registerCacheType)
cacheType, err := pstorage.ParseCacheType(builder.registerCacheType)
if err != nil {
return nil, fmt.Errorf("could not parse register cache type: %w", err)
}
cacheMetrics := metrics.NewCacheCollector(builder.RootChainID)
registersCache, err := pStorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics)
registersCache, err := pstorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics)
if err != nil {
return nil, fmt.Errorf("could not create registers cache: %w", err)
}
Expand Down Expand Up @@ -1263,6 +1293,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"execution-data-max-retry-delay",
defaultConfig.executionDataConfig.MaxRetryDelay,
"maximum delay for exponential backoff when fetching execution data fails e.g. 5m")
flags.StringVar(&builder.executionDataDBMode,
"execution-data-db",
defaultConfig.executionDataDBMode,
"[experimental] the DB type for execution datastore. One of [badger, pebble]")
flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget,
"execution-data-height-range-target",
defaultConfig.executionDataPrunerHeightRangeTarget,
Expand Down
16 changes: 8 additions & 8 deletions cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ import (

awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
badgerDB "github.com/dgraph-io/badger/v2"
"github.com/dgraph-io/badger/v2"
"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/go-cid"
badger "github.com/ipfs/go-ds-badger2"
badgerds "github.com/ipfs/go-ds-badger2"
"github.com/onflow/cadence"
"github.com/onflow/flow-core-contracts/lib/go/templates"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -154,7 +154,7 @@ type ExecutionNode struct {
executionDataStore execution_data.ExecutionDataStore
toTriggerCheckpoint *atomic.Bool // create the checkpoint trigger to be controlled by admin tool, and listened by the compactor
stopControl *stop.StopControl // stop the node at given block height
executionDataDatastore *badger.Datastore
executionDataDatastore *badgerds.Datastore
executionDataPruner *pruner.Pruner
executionDataBlobstore blobs.Blobstore
executionDataTracker tracker.Storage
Expand Down Expand Up @@ -669,8 +669,8 @@ func (exeNode *ExecutionNode) LoadExecutionDataDatastore(
if err != nil {
return err
}
dsOpts := &badger.DefaultOptions
ds, err := badger.NewDatastore(datastoreDir, dsOpts)
dsOpts := &badgerds.DefaultOptions
ds, err := badgerds.NewDatastore(datastoreDir, dsOpts)
if err != nil {
return err
}
Expand All @@ -691,10 +691,10 @@ func (exeNode *ExecutionNode) LoadExecutionDataGetter(node *NodeConfig) error {
return nil
}

func OpenChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) {
func OpenChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badger.DB, error) {
log := sutil.NewLogger(logger)

opts := badgerDB.
opts := badger.
DefaultOptions(dbPath).
WithKeepL0InMemory(true).
WithLogger(log).
Expand All @@ -708,7 +708,7 @@ func OpenChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, er
WithValueLogFileSize(256 << 23).
WithValueLogMaxEntries(100000) // Default is 1000000

db, err := badgerDB.Open(opts)
db, err := badger.Open(opts)
if err != nil {
return nil, fmt.Errorf("could not open chunk data pack badger db at path %v: %w", dbPath, err)
}
Expand Down
Loading

0 comments on commit d719f5f

Please sign in to comment.