diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index dc760ab08e7..12086db5eea 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -11,9 +11,12 @@ import ( "strings" "time" + "github.com/cockroachdb/pebble" + "github.com/dgraph-io/badger/v2" "github.com/ipfs/boxo/bitswap" "github.com/ipfs/go-cid" - badger "github.com/ipfs/go-ds-badger2" + "github.com/ipfs/go-datastore" + badgerds "github.com/ipfs/go-ds-badger2" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/routing" "github.com/onflow/crypto" @@ -69,6 +72,7 @@ import ( "github.com/onflow/flow-go/module/executiondatasync/execution_data" execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" "github.com/onflow/flow-go/module/executiondatasync/pruner" + edstorage "github.com/onflow/flow-go/module/executiondatasync/storage" "github.com/onflow/flow-go/module/executiondatasync/tracker" finalizer "github.com/onflow/flow-go/module/finalizer/consensus" "github.com/onflow/flow-go/module/grpcserver" @@ -106,7 +110,7 @@ import ( "github.com/onflow/flow-go/state/protocol/blocktimer" "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/badger" - pStorage "github.com/onflow/flow-go/storage/pebble" + pstorage "github.com/onflow/flow-go/storage/pebble" "github.com/onflow/flow-go/utils/grpcutils" ) @@ -148,6 +152,7 @@ type AccessNodeConfig struct { rpcMetricsEnabled bool executionDataSyncEnabled bool publicNetworkExecutionDataEnabled bool + executionDataDBMode string executionDataPrunerHeightRangeTarget uint64 executionDataPrunerThreshold uint64 executionDataPruningInterval time.Duration @@ -257,6 +262,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { MaxRetryDelay: edrequester.DefaultMaxRetryDelay, }, executionDataIndexingEnabled: false, + executionDataDBMode: execution_data.ExecutionDataDBModeBadger.String(), executionDataPrunerHeightRangeTarget: 0, executionDataPrunerThreshold: pruner.DefaultThreshold, executionDataPruningInterval: pruner.DefaultPruningInterval, @@ -265,7 +271,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig { scriptExecutorConfig: query.NewDefaultConfig(), scriptExecMinBlock: 0, scriptExecMaxBlock: math.MaxUint64, - registerCacheType: pStorage.CacheTypeTwoQueue.String(), + registerCacheType: pstorage.CacheTypeTwoQueue.String(), registerCacheSize: 0, programCacheSize: 0, checkPayerBalance: false, @@ -315,7 +321,7 @@ type FlowAccessNodeBuilder struct { IndexerDependencies *cmd.DependencyList collectionExecutedMetric module.CollectionExecutedMetric ExecutionDataPruner *pruner.Pruner - ExecutionDataDatastore *badger.Datastore + ExecutionDatastoreManager edstorage.DatastoreManager ExecutionDataTracker tracker.Storage versionControl *version.VersionControl @@ -522,6 +528,7 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu } func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder { + var ds datastore.Batching var bs network.BlobService var processedBlockHeight storage.ConsumerProgress var processedNotifications storage.ConsumerProgress @@ -529,6 +536,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess var execDataDistributor *edrequester.ExecutionDataDistributor var execDataCacheBackend *herocache.BlockExecutionData var executionDataStoreCache *execdatacache.ExecutionDataCache + var executionDataDBMode execution_data.ExecutionDataDBMode // setup dependency chain to ensure indexer starts after the requester requesterDependable := module.NewProxiedReadyDoneAware() @@ -547,12 +555,26 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess return err } - builder.ExecutionDataDatastore, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions) + executionDataDBMode, err = execution_data.ParseExecutionDataDBMode(builder.executionDataDBMode) if err != nil { - return err + return fmt.Errorf("could not parse execution data DB mode: %w", err) } + + if executionDataDBMode == execution_data.ExecutionDataDBModePebble { + builder.ExecutionDatastoreManager, err = edstorage.NewPebbleDatastoreManager(datastoreDir, nil) + if err != nil { + return fmt.Errorf("could not create PebbleDatastoreManager for execution data: %w", err) + } + } else { + builder.ExecutionDatastoreManager, err = edstorage.NewBadgerDatastoreManager(datastoreDir, &badgerds.DefaultOptions) + if err != nil { + return fmt.Errorf("could not create BadgerDatastoreManager for execution data: %w", err) + } + } + ds = builder.ExecutionDatastoreManager.Datastore() + builder.ShutdownFunc(func() error { - if err := builder.ExecutionDataDatastore.Close(); err != nil { + if err := builder.ExecutionDatastoreManager.Close(); err != nil { return fmt.Errorf("could not close execution data datastore: %w", err) } return nil @@ -563,13 +585,21 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess Module("processed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. - processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterBlockHeight) + if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { + processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + } else { + processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + } return nil }). Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. - processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterNotification) + if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { + processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification) + } else { + processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification) + } return nil }). Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error { @@ -578,7 +608,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess return nil }). Module("execution datastore", func(node *cmd.NodeConfig) error { - builder.ExecutionDataBlobstore = blobs.NewBlobstore(builder.ExecutionDataDatastore) + builder.ExecutionDataBlobstore = blobs.NewBlobstore(ds) builder.ExecutionDataStore = execution_data.NewExecutionDataStore(builder.ExecutionDataBlobstore, execution_data.DefaultSerializer) return nil }). @@ -621,7 +651,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess } var err error - bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, builder.ExecutionDataDatastore, opts...) + bs, err = node.EngineRegistry.RegisterBlobService(channels.ExecutionDataService, ds, opts...) if err != nil { return nil, fmt.Errorf("could not register blob service: %w", err) } @@ -746,7 +776,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess prunerMetrics, builder.ExecutionDataTracker, pruner.WithPruneCallback(func(ctx context.Context) error { - return builder.ExecutionDataDatastore.CollectGarbage(ctx) + return builder.ExecutionDatastoreManager.CollectGarbage(ctx) }), pruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget), pruner.WithThreshold(builder.executionDataPrunerThreshold), @@ -782,7 +812,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess net := builder.AccessNodeConfig.PublicNetworkConfig.Network var err error - builder.PublicBlobService, err = net.RegisterBlobService(channels.PublicExecutionDataService, builder.ExecutionDataDatastore, opts...) + builder.PublicBlobService, err = net.RegisterBlobService(channels.PublicExecutionDataService, ds, opts...) if err != nil { return nil, fmt.Errorf("could not register blob service: %w", err) } @@ -816,7 +846,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess // other components from starting while bootstrapping the register db since it may // take hours to complete. - pdb, err := pStorage.OpenRegisterPebbleDB(builder.registersDBPath) + pdb, err := pstorage.OpenRegisterPebbleDB(builder.registersDBPath) if err != nil { return nil, fmt.Errorf("could not open registers db: %w", err) } @@ -824,7 +854,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess return pdb.Close() }) - bootstrapped, err := pStorage.IsBootstrapped(pdb) + bootstrapped, err := pstorage.IsBootstrapped(pdb) if err != nil { return nil, fmt.Errorf("could not check if registers db is bootstrapped: %w", err) } @@ -856,7 +886,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess } rootHash := ledger.RootHash(builder.RootSeal.FinalState) - bootstrap, err := pStorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger) + bootstrap, err := pstorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger) if err != nil { return nil, fmt.Errorf("could not create registers bootstrap: %w", err) } @@ -869,18 +899,18 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess } } - registers, err := pStorage.NewRegisters(pdb) + registers, err := pstorage.NewRegisters(pdb) if err != nil { return nil, fmt.Errorf("could not create registers storage: %w", err) } if builder.registerCacheSize > 0 { - cacheType, err := pStorage.ParseCacheType(builder.registerCacheType) + cacheType, err := pstorage.ParseCacheType(builder.registerCacheType) if err != nil { return nil, fmt.Errorf("could not parse register cache type: %w", err) } cacheMetrics := metrics.NewCacheCollector(builder.RootChainID) - registersCache, err := pStorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics) + registersCache, err := pstorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics) if err != nil { return nil, fmt.Errorf("could not create registers cache: %w", err) } @@ -1263,6 +1293,10 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { "execution-data-max-retry-delay", defaultConfig.executionDataConfig.MaxRetryDelay, "maximum delay for exponential backoff when fetching execution data fails e.g. 5m") + flags.StringVar(&builder.executionDataDBMode, + "execution-data-db", + defaultConfig.executionDataDBMode, + "[experimental] the DB type for execution datastore. One of [badger, pebble]") flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, "execution-data-height-range-target", defaultConfig.executionDataPrunerHeightRangeTarget, diff --git a/cmd/execution_builder.go b/cmd/execution_builder.go index 6b276c4ac61..7617f44b1af 100644 --- a/cmd/execution_builder.go +++ b/cmd/execution_builder.go @@ -13,10 +13,10 @@ import ( awsconfig "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" - badgerDB "github.com/dgraph-io/badger/v2" + "github.com/dgraph-io/badger/v2" "github.com/ipfs/boxo/bitswap" "github.com/ipfs/go-cid" - badger "github.com/ipfs/go-ds-badger2" + badgerds "github.com/ipfs/go-ds-badger2" "github.com/onflow/cadence" "github.com/onflow/flow-core-contracts/lib/go/templates" "github.com/rs/zerolog" @@ -154,7 +154,7 @@ type ExecutionNode struct { executionDataStore execution_data.ExecutionDataStore toTriggerCheckpoint *atomic.Bool // create the checkpoint trigger to be controlled by admin tool, and listened by the compactor stopControl *stop.StopControl // stop the node at given block height - executionDataDatastore *badger.Datastore + executionDataDatastore *badgerds.Datastore executionDataPruner *pruner.Pruner executionDataBlobstore blobs.Blobstore executionDataTracker tracker.Storage @@ -669,8 +669,8 @@ func (exeNode *ExecutionNode) LoadExecutionDataDatastore( if err != nil { return err } - dsOpts := &badger.DefaultOptions - ds, err := badger.NewDatastore(datastoreDir, dsOpts) + dsOpts := &badgerds.DefaultOptions + ds, err := badgerds.NewDatastore(datastoreDir, dsOpts) if err != nil { return err } @@ -691,10 +691,10 @@ func (exeNode *ExecutionNode) LoadExecutionDataGetter(node *NodeConfig) error { return nil } -func OpenChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, error) { +func OpenChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badger.DB, error) { log := sutil.NewLogger(logger) - opts := badgerDB. + opts := badger. DefaultOptions(dbPath). WithKeepL0InMemory(true). WithLogger(log). @@ -708,7 +708,7 @@ func OpenChunkDataPackDB(dbPath string, logger zerolog.Logger) (*badgerDB.DB, er WithValueLogFileSize(256 << 23). WithValueLogMaxEntries(100000) // Default is 1000000 - db, err := badgerDB.Open(opts) + db, err := badger.Open(opts) if err != nil { return nil, fmt.Errorf("could not open chunk data pack badger db at path %v: %w", dbPath, err) } diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index 772328228ec..ed05d3edfae 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -13,9 +13,12 @@ import ( "strings" "time" + "github.com/cockroachdb/pebble" + "github.com/dgraph-io/badger/v2" "github.com/ipfs/boxo/bitswap" "github.com/ipfs/go-cid" - badger "github.com/ipfs/go-ds-badger2" + "github.com/ipfs/go-datastore" + badgerds "github.com/ipfs/go-ds-badger2" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -68,6 +71,7 @@ import ( "github.com/onflow/flow-go/module/executiondatasync/execution_data" execdatacache "github.com/onflow/flow-go/module/executiondatasync/execution_data/cache" "github.com/onflow/flow-go/module/executiondatasync/pruner" + edstorage "github.com/onflow/flow-go/module/executiondatasync/storage" "github.com/onflow/flow-go/module/executiondatasync/tracker" finalizer "github.com/onflow/flow-go/module/finalizer/consensus" "github.com/onflow/flow-go/module/grpcserver" @@ -107,7 +111,7 @@ import ( "github.com/onflow/flow-go/state/protocol/events/gadgets" "github.com/onflow/flow-go/storage" bstorage "github.com/onflow/flow-go/storage/badger" - pStorage "github.com/onflow/flow-go/storage/pebble" + pstorage "github.com/onflow/flow-go/storage/pebble" "github.com/onflow/flow-go/utils/grpcutils" "github.com/onflow/flow-go/utils/io" ) @@ -154,6 +158,7 @@ type ObserverServiceConfig struct { logTxTimeToFinalizedExecuted bool executionDataSyncEnabled bool executionDataIndexingEnabled bool + executionDataDBMode string executionDataPrunerHeightRangeTarget uint64 executionDataPrunerThreshold uint64 executionDataPruningInterval time.Duration @@ -228,6 +233,7 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig { logTxTimeToFinalizedExecuted: false, executionDataSyncEnabled: false, executionDataIndexingEnabled: false, + executionDataDBMode: execution_data.ExecutionDataDBModeBadger.String(), executionDataPrunerHeightRangeTarget: 0, executionDataPrunerThreshold: pruner.DefaultThreshold, executionDataPruningInterval: pruner.DefaultPruningInterval, @@ -245,7 +251,7 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig { }, scriptExecMinBlock: 0, scriptExecMaxBlock: math.MaxUint64, - registerCacheType: pStorage.CacheTypeTwoQueue.String(), + registerCacheType: pstorage.CacheTypeTwoQueue.String(), registerCacheSize: 0, programCacheSize: 0, } @@ -275,13 +281,13 @@ type ObserverServiceBuilder struct { IndexerDependencies *cmd.DependencyList versionControl *version.VersionControl - ExecutionDataDownloader execution_data.Downloader - ExecutionDataRequester state_synchronization.ExecutionDataRequester - ExecutionDataStore execution_data.ExecutionDataStore - ExecutionDataBlobstore blobs.Blobstore - ExecutionDataPruner *pruner.Pruner - ExecutionDataDatastore *badger.Datastore - ExecutionDataTracker tracker.Storage + ExecutionDataDownloader execution_data.Downloader + ExecutionDataRequester state_synchronization.ExecutionDataRequester + ExecutionDataStore execution_data.ExecutionDataStore + ExecutionDataBlobstore blobs.Blobstore + ExecutionDataPruner *pruner.Pruner + ExecutionDatastoreManager edstorage.DatastoreManager + ExecutionDataTracker tracker.Storage RegistersAsyncStore *execution.RegistersAsyncStore Reporter *index.Reporter @@ -678,6 +684,10 @@ func (builder *ObserverServiceBuilder) extraFlags() { flags.BoolVar(&builder.localServiceAPIEnabled, "local-service-api-enabled", defaultConfig.localServiceAPIEnabled, "whether to use local indexed data for api queries") flags.StringVar(&builder.registersDBPath, "execution-state-dir", defaultConfig.registersDBPath, "directory to use for execution-state database") flags.StringVar(&builder.checkpointFile, "execution-state-checkpoint", defaultConfig.checkpointFile, "execution-state checkpoint file") + flags.StringVar(&builder.executionDataDBMode, + "execution-data-db", + defaultConfig.executionDataDBMode, + "[experimental] the DB type for execution datastore. One of [badger, pebble]") // Execution data pruner flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, @@ -1095,6 +1105,7 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) { } func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder { + var ds datastore.Batching var bs network.BlobService var processedBlockHeight storage.ConsumerProgress var processedNotifications storage.ConsumerProgress @@ -1102,6 +1113,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS var execDataDistributor *edrequester.ExecutionDataDistributor var execDataCacheBackend *herocache.BlockExecutionData var executionDataStoreCache *execdatacache.ExecutionDataCache + var executionDataDBMode execution_data.ExecutionDataDBMode // setup dependency chain to ensure indexer starts after the requester requesterDependable := module.NewProxiedReadyDoneAware() @@ -1120,13 +1132,26 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS return err } - builder.ExecutionDataDatastore, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions) + executionDataDBMode, err = execution_data.ParseExecutionDataDBMode(builder.executionDataDBMode) if err != nil { - return err + return fmt.Errorf("could not parse execution data DB mode: %w", err) + } + + if executionDataDBMode == execution_data.ExecutionDataDBModePebble { + builder.ExecutionDatastoreManager, err = edstorage.NewPebbleDatastoreManager(datastoreDir, nil) + if err != nil { + return fmt.Errorf("could not create PebbleDatastoreManager for execution data: %w", err) + } + } else { + builder.ExecutionDatastoreManager, err = edstorage.NewBadgerDatastoreManager(datastoreDir, &badgerds.DefaultOptions) + if err != nil { + return fmt.Errorf("could not create BadgerDatastoreManager for execution data: %w", err) + } } + ds = builder.ExecutionDatastoreManager.Datastore() builder.ShutdownFunc(func() error { - if err := builder.ExecutionDataDatastore.Close(); err != nil { + if err := builder.ExecutionDatastoreManager.Close(); err != nil { return fmt.Errorf("could not close execution data datastore: %w", err) } return nil @@ -1137,13 +1162,21 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS Module("processed block height consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. - processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterBlockHeight) + if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { + processedBlockHeight = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + } else { + processedBlockHeight = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterBlockHeight) + } return nil }). Module("processed notifications consumer progress", func(node *cmd.NodeConfig) error { // Note: progress is stored in the datastore's DB since that is where the jobqueue // writes execution data to. - processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDataDatastore.DB, module.ConsumeProgressExecutionDataRequesterNotification) + if executionDataDBMode == execution_data.ExecutionDataDBModeBadger { + processedNotifications = bstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*badger.DB), module.ConsumeProgressExecutionDataRequesterNotification) + } else { + processedNotifications = pstorage.NewConsumerProgress(builder.ExecutionDatastoreManager.DB().(*pebble.DB), module.ConsumeProgressExecutionDataRequesterNotification) + } return nil }). Module("blobservice peer manager dependencies", func(node *cmd.NodeConfig) error { @@ -1152,7 +1185,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS return nil }). Module("execution datastore", func(node *cmd.NodeConfig) error { - builder.ExecutionDataBlobstore = blobs.NewBlobstore(builder.ExecutionDataDatastore) + builder.ExecutionDataBlobstore = blobs.NewBlobstore(ds) builder.ExecutionDataStore = execution_data.NewExecutionDataStore(builder.ExecutionDataBlobstore, execution_data.DefaultSerializer) return nil }). @@ -1187,7 +1220,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS } var err error - bs, err = node.EngineRegistry.RegisterBlobService(channels.PublicExecutionDataService, builder.ExecutionDataDatastore, opts...) + bs, err = node.EngineRegistry.RegisterBlobService(channels.PublicExecutionDataService, ds, opts...) if err != nil { return nil, fmt.Errorf("could not register blob service: %w", err) } @@ -1313,7 +1346,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS prunerMetrics, builder.ExecutionDataTracker, pruner.WithPruneCallback(func(ctx context.Context) error { - return builder.ExecutionDataDatastore.CollectGarbage(ctx) + return builder.ExecutionDatastoreManager.CollectGarbage(ctx) }), pruner.WithHeightRangeTarget(builder.executionDataPrunerHeightRangeTarget), pruner.WithThreshold(builder.executionDataPrunerThreshold), @@ -1342,7 +1375,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS // other components from starting while bootstrapping the register db since it may // take hours to complete. - pdb, err := pStorage.OpenRegisterPebbleDB(builder.registersDBPath) + pdb, err := pstorage.OpenRegisterPebbleDB(builder.registersDBPath) if err != nil { return nil, fmt.Errorf("could not open registers db: %w", err) } @@ -1350,7 +1383,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS return pdb.Close() }) - bootstrapped, err := pStorage.IsBootstrapped(pdb) + bootstrapped, err := pstorage.IsBootstrapped(pdb) if err != nil { return nil, fmt.Errorf("could not check if registers db is bootstrapped: %w", err) } @@ -1382,7 +1415,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS } rootHash := ledger.RootHash(builder.RootSeal.FinalState) - bootstrap, err := pStorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger) + bootstrap, err := pstorage.NewRegisterBootstrap(pdb, checkpointFile, checkpointHeight, rootHash, builder.Logger) if err != nil { return nil, fmt.Errorf("could not create registers bootstrap: %w", err) } @@ -1395,18 +1428,18 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS } } - registers, err := pStorage.NewRegisters(pdb) + registers, err := pstorage.NewRegisters(pdb) if err != nil { return nil, fmt.Errorf("could not create registers storage: %w", err) } if builder.registerCacheSize > 0 { - cacheType, err := pStorage.ParseCacheType(builder.registerCacheType) + cacheType, err := pstorage.ParseCacheType(builder.registerCacheType) if err != nil { return nil, fmt.Errorf("could not parse register cache type: %w", err) } cacheMetrics := metrics.NewCacheCollector(builder.RootChainID) - registersCache, err := pStorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics) + registersCache, err := pstorage.NewRegistersCache(registers, cacheType, builder.registerCacheSize, cacheMetrics) if err != nil { return nil, fmt.Errorf("could not create registers cache: %w", err) } diff --git a/go.mod b/go.mod index 334e8b0ed0b..78d250b20c1 100644 --- a/go.mod +++ b/go.mod @@ -34,6 +34,7 @@ require ( github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-badger2 v0.1.3 + github.com/ipfs/go-ds-pebble v0.3.1 github.com/ipfs/go-ipld-format v0.6.0 github.com/ipfs/go-log v1.0.5 github.com/ipfs/go-log/v2 v2.5.1 @@ -330,3 +331,6 @@ require ( // Using custom fork until https://github.com/onflow/flow-go/issues/5338 is resolved replace github.com/ipfs/boxo => github.com/onflow/boxo v0.0.0-20240201202436-f2477b92f483 + +// TODO: remove it when https://github.com/ipfs/go-ds-pebble/pull/36 merged +replace github.com/ipfs/go-ds-pebble v0.3.1 => github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c diff --git a/go.sum b/go.sum index 27eff0d4cdb..1ec823ee51f 100644 --- a/go.sum +++ b/go.sum @@ -2192,6 +2192,8 @@ github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJ github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/flow/protobuf/go/flow v0.4.5 h1:6o+pgYGqwXdEhqSJxu2BdnDXkOQVOkfGAb6IiXB+NPM= github.com/onflow/flow/protobuf/go/flow v0.4.5/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c h1:T0jDCm7k7uqDo26JiiujQ5oryl30itPnlmZQywTu9ng= +github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c/go.mod h1:XYnWtulwJvHVOr2B0WVA/UC3dvRgFevjp8Pn9a3E1xo= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/nft-storefront/lib/go/contracts v1.0.0 h1:sxyWLqGm/p4EKT6DUlQESDG1ZNMN9GjPCm1gTq7NGfc= diff --git a/insecure/go.mod b/insecure/go.mod index 84e6a12ad00..4ef70de51ea 100644 --- a/insecure/go.mod +++ b/insecure/go.mod @@ -140,6 +140,7 @@ require ( github.com/ipfs/go-cid v0.4.1 // indirect github.com/ipfs/go-cidutil v0.1.0 // indirect github.com/ipfs/go-ds-badger2 v0.1.3 // indirect + github.com/ipfs/go-ds-pebble v0.3.1 // indirect github.com/ipfs/go-ipfs-delay v0.0.1 // indirect github.com/ipfs/go-ipfs-pq v0.0.3 // indirect github.com/ipfs/go-ipfs-util v0.0.3 // indirect @@ -308,3 +309,6 @@ require ( ) replace github.com/onflow/flow-go => ../ + +// TODO: remove it when https://github.com/ipfs/go-ds-pebble/pull/36 merged +replace github.com/ipfs/go-ds-pebble v0.3.1 => github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c diff --git a/insecure/go.sum b/insecure/go.sum index b131f3fe0c4..be84a925211 100644 --- a/insecure/go.sum +++ b/insecure/go.sum @@ -2180,6 +2180,8 @@ github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJ github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/flow/protobuf/go/flow v0.4.5 h1:6o+pgYGqwXdEhqSJxu2BdnDXkOQVOkfGAb6IiXB+NPM= github.com/onflow/flow/protobuf/go/flow v0.4.5/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c h1:T0jDCm7k7uqDo26JiiujQ5oryl30itPnlmZQywTu9ng= +github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c/go.mod h1:XYnWtulwJvHVOr2B0WVA/UC3dvRgFevjp8Pn9a3E1xo= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/sdks v0.5.1-0.20230912225508-b35402f12bba/go.mod h1:F0dj0EyHC55kknLkeD10js4mo14yTdMotnWMslPirrU= diff --git a/integration/go.mod b/integration/go.mod index 6e5e8a79c31..c27de2bd954 100644 --- a/integration/go.mod +++ b/integration/go.mod @@ -18,6 +18,7 @@ require ( github.com/ipfs/go-cid v0.4.1 github.com/ipfs/go-datastore v0.6.0 github.com/ipfs/go-ds-badger2 v0.1.3 + github.com/ipfs/go-ds-pebble v0.3.1 github.com/libp2p/go-libp2p v0.32.2 github.com/onflow/cadence v1.0.0-preview.42 github.com/onflow/crypto v0.25.1 @@ -358,3 +359,6 @@ require ( replace github.com/onflow/flow-go => ../ replace github.com/onflow/flow-go/insecure => ../insecure + +// TODO: remove it when https://github.com/ipfs/go-ds-pebble/pull/36 merged +replace github.com/ipfs/go-ds-pebble v0.3.1 => github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c diff --git a/integration/go.sum b/integration/go.sum index cfe98f47bfe..e73a0221eb5 100644 --- a/integration/go.sum +++ b/integration/go.sum @@ -2166,6 +2166,8 @@ github.com/onflow/flow-nft/lib/go/templates v1.2.0/go.mod h1:p+2hRvtjLUR3MW1NsoJ github.com/onflow/flow/protobuf/go/flow v0.3.2-0.20231121210617-52ee94b830c2/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= github.com/onflow/flow/protobuf/go/flow v0.4.5 h1:6o+pgYGqwXdEhqSJxu2BdnDXkOQVOkfGAb6IiXB+NPM= github.com/onflow/flow/protobuf/go/flow v0.4.5/go.mod h1:NA2pX2nw8zuaxfKphhKsk00kWLwfd+tv8mS23YXO4Sk= +github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c h1:T0jDCm7k7uqDo26JiiujQ5oryl30itPnlmZQywTu9ng= +github.com/onflow/go-ds-pebble v0.0.0-20240731130313-f186539f382c/go.mod h1:XYnWtulwJvHVOr2B0WVA/UC3dvRgFevjp8Pn9a3E1xo= github.com/onflow/go-ethereum v1.14.7 h1:gg3awYqI02e3AypRdpJKEvNTJ6kz/OhAqRti0h54Wlc= github.com/onflow/go-ethereum v1.14.7/go.mod h1:zV14QLrXyYu5ucvcwHUA0r6UaqveqbXaehAVQJlSW+I= github.com/onflow/sdks v0.5.1-0.20230912225508-b35402f12bba/go.mod h1:F0dj0EyHC55kknLkeD10js4mo14yTdMotnWMslPirrU= diff --git a/integration/tests/access/cohort3/execution_state_sync_test.go b/integration/tests/access/cohort3/execution_state_sync_test.go index 7f3f50f47f7..4ff4e77383e 100644 --- a/integration/tests/access/cohort3/execution_state_sync_test.go +++ b/integration/tests/access/cohort3/execution_state_sync_test.go @@ -6,7 +6,9 @@ import ( "path/filepath" "testing" + "github.com/ipfs/go-datastore" badgerds "github.com/ipfs/go-ds-badger2" + pebbleds "github.com/ipfs/go-ds-pebble" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -41,14 +43,20 @@ type ExecutionStateSyncSuite struct { ctx context.Context cancel context.CancelFunc - net *testnet.FlowNetwork + net *testnet.FlowNetwork + executionDataDBMode execution_data.ExecutionDataDBMode } func (s *ExecutionStateSyncSuite) SetupTest() { + s.setup(execution_data.ExecutionDataDBModeBadger) +} + +func (s *ExecutionStateSyncSuite) setup(executionDataDBMode execution_data.ExecutionDataDBMode) { s.log = unittest.LoggerForTest(s.Suite.T(), zerolog.InfoLevel) s.log.Info().Msg("================> SetupTest") s.ctx, s.cancel = context.WithCancel(context.Background()) + s.executionDataDBMode = executionDataDBMode s.buildNetworkConfig() // start the network @@ -82,6 +90,7 @@ func (s *ExecutionStateSyncSuite) buildNetworkConfig() { testnet.WithAdditionalFlag(fmt.Sprintf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir)), testnet.WithAdditionalFlag("--execution-data-retry-delay=1s"), testnet.WithAdditionalFlagf("--public-network-execution-data-sync-enabled=true"), + testnet.WithAdditionalFlag(fmt.Sprintf("--execution-data-db=%s", s.executionDataDBMode.String())), ) // add the ghost (access) node config @@ -121,6 +130,7 @@ func (s *ExecutionStateSyncSuite) buildNetworkConfig() { fmt.Sprintf("--execution-data-dir=%s", testnet.DefaultExecutionDataServiceDir), "--execution-data-sync-enabled=true", "--event-query-mode=execution-nodes-only", + fmt.Sprintf("--execution-data-db=%s", s.executionDataDBMode.String()), }, }} @@ -128,9 +138,13 @@ func (s *ExecutionStateSyncSuite) buildNetworkConfig() { s.net = testnet.PrepareFlowNetwork(s.T(), conf, flow.Localnet) } -// TestHappyPath tests that Execution Nodes generate execution data, and Access Nodes are able to -// successfully sync the data -func (s *ExecutionStateSyncSuite) TestHappyPath() { +// TestBadgerDBHappyPath tests that Execution Nodes generate execution data, and Access Nodes are able to +// successfully sync the data to badger DB +func (s *ExecutionStateSyncSuite) TestBadgerDBHappyPath() { + s.executionStateSyncTest() +} + +func (s *ExecutionStateSyncSuite) executionStateSyncTest() { // Let the network run for this many blocks runBlocks := uint64(60) @@ -203,7 +217,15 @@ func (s *ExecutionStateSyncSuite) TestHappyPath() { } func (s *ExecutionStateSyncSuite) nodeExecutionDataStore(node *testnet.Container) execution_data.ExecutionDataStore { - ds, err := badgerds.NewDatastore(filepath.Join(node.ExecutionDataDBPath(), "blobstore"), &badgerds.DefaultOptions) + var ds datastore.Batching + var err error + dsPath := filepath.Join(node.ExecutionDataDBPath(), "blobstore") + + if s.executionDataDBMode == execution_data.ExecutionDataDBModePebble { + ds, err = pebbleds.NewDatastore(dsPath, nil) + } else { + ds, err = badgerds.NewDatastore(dsPath, &badgerds.DefaultOptions) + } require.NoError(s.T(), err, "could not get execution datastore") return execution_data.NewExecutionDataStore(blobs.NewBlobstore(ds), execution_data.DefaultSerializer) diff --git a/integration/tests/access/cohort3/pebble_execution_state_sync_test.go b/integration/tests/access/cohort3/pebble_execution_state_sync_test.go new file mode 100644 index 00000000000..aa345276ff9 --- /dev/null +++ b/integration/tests/access/cohort3/pebble_execution_state_sync_test.go @@ -0,0 +1,27 @@ +package cohort3 + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/onflow/flow-go/module/executiondatasync/execution_data" +) + +func TestPebbleExecutionStateSync(t *testing.T) { + suite.Run(t, new(PebbleExecutionStateSync)) +} + +type PebbleExecutionStateSync struct { + ExecutionStateSyncSuite +} + +func (s *PebbleExecutionStateSync) SetupTest() { + s.setup(execution_data.ExecutionDataDBModePebble) +} + +// TestPebbleDBHappyPath+ tests that Execution Nodes generate execution data, and Access Nodes are able to +// successfully sync the data to pebble DB +func (s *PebbleExecutionStateSync) TestPebbleDBHappyPath() { + s.executionStateSyncTest() +} diff --git a/module/executiondatasync/execution_data/execution_data.go b/module/executiondatasync/execution_data/execution_data.go index 5c4fdd4edbf..46f6e741738 100644 --- a/module/executiondatasync/execution_data/execution_data.go +++ b/module/executiondatasync/execution_data/execution_data.go @@ -1,10 +1,45 @@ package execution_data import ( + "errors" + "github.com/onflow/flow-go/ledger" "github.com/onflow/flow-go/model/flow" ) +// ExecutionDataDBMode controls which db type to use. +type ExecutionDataDBMode int + +const ( + // ExecutionDataDBModeBadger uses badger db + ExecutionDataDBModeBadger ExecutionDataDBMode = iota + 1 + + // ExecutionDataDBModePebble uses pebble db + ExecutionDataDBModePebble +) + +func ParseExecutionDataDBMode(s string) (ExecutionDataDBMode, error) { + switch s { + case ExecutionDataDBModeBadger.String(): + return ExecutionDataDBModeBadger, nil + case ExecutionDataDBModePebble.String(): + return ExecutionDataDBModePebble, nil + default: + return 0, errors.New("invalid execution data DB mode") + } +} + +func (m ExecutionDataDBMode) String() string { + switch m { + case ExecutionDataDBModeBadger: + return "badger" + case ExecutionDataDBModePebble: + return "pebble" + default: + return "" + } +} + // DefaultMaxBlobSize is the default maximum size of a blob. // This is calibrated to fit within a libp2p message and not exceed the max size recommended by bitswap. const DefaultMaxBlobSize = 1 << 20 // 1MiB diff --git a/module/executiondatasync/storage/badger_datastore_manager.go b/module/executiondatasync/storage/badger_datastore_manager.go new file mode 100644 index 00000000000..3c0cec6ab67 --- /dev/null +++ b/module/executiondatasync/storage/badger_datastore_manager.go @@ -0,0 +1,59 @@ +package storage + +import ( + "context" + + ds "github.com/ipfs/go-datastore" + badgerds "github.com/ipfs/go-ds-badger2" +) + +var _ DatastoreManager = (*BadgerDatastoreManager)(nil) + +// BadgerDatastoreManager wraps the Badger datastore to implement the +// DatastoreManager interface. It provides access to a Badger datastore +// instance and implements the required methods for managing it. +type BadgerDatastoreManager struct { + ds *badgerds.Datastore +} + +// NewBadgerDatastoreManager creates a new instance of BadgerDatastoreManager +// with the specified datastore path and options. +// +// Parameters: +// - path: The path where the datastore files will be stored. +// - options: Configuration options for the Badger datastore. +// +// No errors are expected during normal operations. +func NewBadgerDatastoreManager(path string, options *badgerds.Options) (*BadgerDatastoreManager, error) { + ds, err := badgerds.NewDatastore(path, options) + if err != nil { + return nil, err + } + + return &BadgerDatastoreManager{ds}, nil +} + +// Datastore provides access to the datastore for performing batched +// read and write operations. +func (b *BadgerDatastoreManager) Datastore() ds.Batching { + return b.ds +} + +// DB returns the raw database object, allowing for more direct +// access to the underlying database features and operations. +func (b *BadgerDatastoreManager) DB() interface{} { + return b.ds.DB +} + +// Close terminates the connection to the datastore and releases +// any associated resources. This method should be called +// when finished using the datastore to ensure proper resource cleanup. +func (b *BadgerDatastoreManager) Close() error { + return b.ds.Close() +} + +// CollectGarbage initiates garbage collection on the Badger datastore +// to reclaim unused space and optimize performance. +func (b *BadgerDatastoreManager) CollectGarbage(ctx context.Context) error { + return b.ds.CollectGarbage(ctx) +} diff --git a/module/executiondatasync/storage/datastore_manager.go b/module/executiondatasync/storage/datastore_manager.go new file mode 100644 index 00000000000..f66d1af2554 --- /dev/null +++ b/module/executiondatasync/storage/datastore_manager.go @@ -0,0 +1,28 @@ +package storage + +import ( + "context" + + "github.com/ipfs/go-datastore" +) + +// DatastoreManager is an interface that defines the methods for managing +// a datastore used for handling execution data. It provides methods to +// access the underlying datastore, perform garbage collection, and handle +// closing operations. Implementations of this interface are expected to +// wrap around different types of datastore's. +type DatastoreManager interface { + // Datastore provides access to the datastore for performing batched + // read and write operations. + Datastore() datastore.Batching + // DB returns the raw database object, allowing for more direct + // access to the underlying database features and operations. + DB() interface{} + // Close terminates the connection to the datastore and releases + // any associated resources. This method should be called + // when finished using the datastore to ensure proper resource cleanup. + Close() error + // CollectGarbage initiates garbage collection on the datastore + // to reclaim unused space and optimize performance. + CollectGarbage(ctx context.Context) error +} diff --git a/module/executiondatasync/storage/pebble_datastore_manager.go b/module/executiondatasync/storage/pebble_datastore_manager.go new file mode 100644 index 00000000000..dc738a16e17 --- /dev/null +++ b/module/executiondatasync/storage/pebble_datastore_manager.go @@ -0,0 +1,79 @@ +package storage + +import ( + "context" + "fmt" + + "github.com/cockroachdb/pebble" + ds "github.com/ipfs/go-datastore" + pebbleds "github.com/ipfs/go-ds-pebble" + + pstorage "github.com/onflow/flow-go/storage/pebble" +) + +var _ DatastoreManager = (*PebbleDatastoreManager)(nil) + +// PebbleDatastoreManager wraps the PebbleDB to implement the StorageDB interface. +type PebbleDatastoreManager struct { + ds *pebbleds.Datastore + db *pebble.DB +} + +// NewPebbleDatastoreManager creates and returns a new instance of PebbleDatastoreManager. +// It initializes the PebbleDB database with the specified path and options. +// If no options are provided, default options are used. +// +// Parameters: +// - path: The path to the directory where the PebbleDB files will be stored. +// - options: Configuration options for the PebbleDB database. If nil, default +// options are applied. +// +// No errors are expected during normal operations. +func NewPebbleDatastoreManager(path string, options *pebble.Options) (*PebbleDatastoreManager, error) { + if options == nil { + cache := pebble.NewCache(pstorage.DefaultPebbleCacheSize) + defer cache.Unref() + options = pstorage.DefaultPebbleOptions(cache, pebble.DefaultComparer) + } + + db, err := pebble.Open(path, options) + if err != nil { + return nil, fmt.Errorf("failed to open db: %w", err) + } + + ds, err := pebbleds.NewDatastore(path, options, pebbleds.WithPebbleDB(db)) + if err != nil { + return nil, fmt.Errorf("could not open tracker ds: %w", err) + } + + return &PebbleDatastoreManager{ + ds: ds, + db: db, + }, nil +} + +// Datastore provides access to the datastore for performing batched +// read and write operations. +func (p *PebbleDatastoreManager) Datastore() ds.Batching { + return p.ds +} + +// DB returns the raw database object, allowing for more direct +// access to the underlying database features and operations. +func (p *PebbleDatastoreManager) DB() interface{} { + return p.db +} + +// Close terminates the connection to the datastore and releases +// any associated resources. This method should be called +// when finished using the datastore to ensure proper resource cleanup. +func (p *PebbleDatastoreManager) Close() error { + return p.ds.Close() +} + +// CollectGarbage initiates garbage collection on the datastore +// to reclaim unused space and optimize performance. +func (p *PebbleDatastoreManager) CollectGarbage(_ context.Context) error { + // In PebbleDB, there's no direct equivalent to manual value log garbage collection + return nil +} diff --git a/storage/pebble/consumer_progress.go b/storage/pebble/consumer_progress.go new file mode 100644 index 00000000000..37448bb4b5f --- /dev/null +++ b/storage/pebble/consumer_progress.go @@ -0,0 +1,50 @@ +package pebble + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + + "github.com/onflow/flow-go/storage/pebble/operation" +) + +type ConsumerProgress struct { + db *pebble.DB + consumer string // to distinguish the consume progress between different consumers +} + +func NewConsumerProgress(db *pebble.DB, consumer string) *ConsumerProgress { + return &ConsumerProgress{ + db: db, + consumer: consumer, + } +} + +func (cp *ConsumerProgress) ProcessedIndex() (uint64, error) { + var processed uint64 + err := operation.RetrieveProcessedIndex(cp.consumer, &processed)(cp.db) + if err != nil { + return 0, fmt.Errorf("failed to retrieve processed index: %w", err) + } + return processed, nil +} + +// InitProcessedIndex insert the default processed index to the storage layer, can only be done once. +// initialize for the second time will return storage.ErrAlreadyExists +func (cp *ConsumerProgress) InitProcessedIndex(defaultIndex uint64) error { + err := operation.InsertProcessedIndex(cp.consumer, defaultIndex)(cp.db) + if err != nil { + return fmt.Errorf("could not update processed index: %w", err) + } + + return nil +} + +func (cp *ConsumerProgress) SetProcessedIndex(processed uint64) error { + err := operation.SetProcessedIndex(cp.consumer, processed)(cp.db) + if err != nil { + return fmt.Errorf("could not update processed index: %w", err) + } + + return nil +} diff --git a/storage/pebble/operation/codes.go b/storage/pebble/operation/codes.go deleted file mode 100644 index 1d9057646c3..00000000000 --- a/storage/pebble/operation/codes.go +++ /dev/null @@ -1,5 +0,0 @@ -package operation - -const ( - codeChunkDataPack = 100 -) diff --git a/storage/pebble/operation/jobs.go b/storage/pebble/operation/jobs.go new file mode 100644 index 00000000000..d18d3f39446 --- /dev/null +++ b/storage/pebble/operation/jobs.go @@ -0,0 +1,19 @@ +package operation + +import ( + "github.com/cockroachdb/pebble" +) + +// RetrieveProcessedIndex returns the processed index for a job consumer +func RetrieveProcessedIndex(jobName string, processed *uint64) func(pebble.Reader) error { + return retrieve(makePrefix(codeJobConsumerProcessed, jobName), processed) +} + +func InsertProcessedIndex(jobName string, processed uint64) func(pebble.Writer) error { + return insert(makePrefix(codeJobConsumerProcessed, jobName), processed) +} + +// SetProcessedIndex updates the processed index for a job consumer with given index +func SetProcessedIndex(jobName string, processed uint64) func(pebble.Writer) error { + return insert(makePrefix(codeJobConsumerProcessed, jobName), processed) +} diff --git a/storage/pebble/operation/prefix.go b/storage/pebble/operation/prefix.go new file mode 100644 index 00000000000..ef7cdf3c08d --- /dev/null +++ b/storage/pebble/operation/prefix.go @@ -0,0 +1,51 @@ +package operation + +import ( + "encoding/binary" + "fmt" + + "github.com/onflow/flow-go/model/flow" +) + +const ( + //lint:ignore U1000 Ignore unused variable warning + // job queue consumers and producers + codeJobConsumerProcessed = 70 + + // legacy codes (should be cleaned up) + codeChunkDataPack = 100 +) + +func makePrefix(code byte, keys ...interface{}) []byte { + prefix := make([]byte, 1) + prefix[0] = code + for _, key := range keys { + prefix = append(prefix, b(key)...) + } + return prefix +} + +func b(v interface{}) []byte { + switch i := v.(type) { + case uint8: + return []byte{i} + case uint32: + b := make([]byte, 4) + binary.BigEndian.PutUint32(b, i) + return b + case uint64: + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, i) + return b + case string: + return []byte(i) + case flow.Role: + return []byte{byte(i)} + case flow.Identifier: + return i[:] + case flow.ChainID: + return []byte(i) + default: + panic(fmt.Sprintf("unsupported type to convert (%T)", v)) + } +} diff --git a/storage/pebble/operation/prefix_test.go b/storage/pebble/operation/prefix_test.go new file mode 100644 index 00000000000..444311ece22 --- /dev/null +++ b/storage/pebble/operation/prefix_test.go @@ -0,0 +1,37 @@ +package operation + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/onflow/flow-go/model/flow" +) + +func TestMakePrefix(t *testing.T) { + + code := byte(0x01) + + u := uint64(1337) + expected := []byte{0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05, 0x39} + actual := makePrefix(code, u) + + assert.Equal(t, expected, actual) + + r := flow.Role(2) + expected = []byte{0x01, 0x02} + actual = makePrefix(code, r) + + assert.Equal(t, expected, actual) + + id := flow.Identifier{0x05, 0x06, 0x07} + expected = []byte{0x01, + 0x05, 0x06, 0x07, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + } + actual = makePrefix(code, id) + + assert.Equal(t, expected, actual) +}