Skip to content

Commit

Permalink
Merge pull request onflow#6345 from The-K-R-O-K/AndriiSlisarchuk/6066…
Browse files Browse the repository at this point in the history
…-enforce-prune-threshold

[Access] Enforce prune threshold
  • Loading branch information
Guitarheroua authored Oct 7, 2024
2 parents 3beefb8 + 9faddd9 commit 3602aeb
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 33 deletions.
10 changes: 9 additions & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ type AccessNodeConfig struct {
checkPayerBalanceMode string
versionControlEnabled bool
stopControlEnabled bool
registerDBPruneThreshold uint64
}

type PublicNetworkConfig struct {
Expand Down Expand Up @@ -280,6 +281,7 @@ func DefaultAccessNodeConfig() *AccessNodeConfig {
checkPayerBalanceMode: accessNode.Disabled.String(),
versionControlEnabled: true,
stopControlEnabled: false,
registerDBPruneThreshold: pruner.DefaultThreshold,
}
}

Expand Down Expand Up @@ -905,7 +907,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}
}

registers, err := pstorage.NewRegisters(pdb)
registers, err := pstorage.NewRegisters(pdb, builder.registerDBPruneThreshold)
if err != nil {
return nil, fmt.Errorf("could not create registers storage: %w", err)
}
Expand Down Expand Up @@ -1421,6 +1423,12 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"check-payer-balance-mode",
defaultConfig.checkPayerBalanceMode,
"flag for payer balance validation that specifies whether or not to enforce the balance check. one of [disabled(default), warn, enforce]")

// Register DB Pruning
flags.Uint64Var(&builder.registerDBPruneThreshold,
"registerdb-pruning-threshold",
defaultConfig.registerDBPruneThreshold,
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))
}).ValidateFlags(func() error {
if builder.supportsObserver && (builder.PublicNetworkConfig.BindAddress == cmd.NotSet || builder.PublicNetworkConfig.BindAddress == "") {
return errors.New("public-network-address must be set if supports-observer is true")
Expand Down
2 changes: 1 addition & 1 deletion cmd/execution_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,7 @@ func (exeNode *ExecutionNode) LoadRegisterStore(
return fmt.Errorf("could not import registers from checkpoint: %w", err)
}
}
diskStore, err := storagepebble.NewRegisters(pebbledb)
diskStore, err := storagepebble.NewRegisters(pebbledb, storagepebble.PruningDisabled)
if err != nil {
return fmt.Errorf("could not create registers storage: %w", err)
}
Expand Down
20 changes: 14 additions & 6 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ type ObserverServiceConfig struct {
registerCacheType string
registerCacheSize uint
programCacheSize uint
registerDBPruneThreshold uint64
}

// DefaultObserverServiceConfig defines all the default values for the ObserverServiceConfig
Expand Down Expand Up @@ -252,11 +253,12 @@ func DefaultObserverServiceConfig() *ObserverServiceConfig {
RetryDelay: edrequester.DefaultRetryDelay,
MaxRetryDelay: edrequester.DefaultMaxRetryDelay,
},
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pstorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
scriptExecMinBlock: 0,
scriptExecMaxBlock: math.MaxUint64,
registerCacheType: pstorage.CacheTypeTwoQueue.String(),
registerCacheSize: 0,
programCacheSize: 0,
registerDBPruneThreshold: pruner.DefaultThreshold,
}
}

Expand Down Expand Up @@ -810,6 +812,12 @@ func (builder *ObserverServiceBuilder) extraFlags() {
"program-cache-size",
defaultConfig.programCacheSize,
"[experimental] number of blocks to cache for cadence programs. use 0 to disable cache. default: 0. Note: this is an experimental feature and may cause nodes to become unstable under certain workloads. Use with caution.")

// Register DB Pruning
flags.Uint64Var(&builder.registerDBPruneThreshold,
"registerdb-pruning-threshold",
defaultConfig.registerDBPruneThreshold,
fmt.Sprintf("specifies the number of blocks below the latest stored block height to keep in register db. default: %d", defaultConfig.registerDBPruneThreshold))
}).ValidateFlags(func() error {
if builder.executionDataSyncEnabled {
if builder.executionDataConfig.FetchTimeout <= 0 {
Expand Down Expand Up @@ -1436,7 +1444,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
}
}

registers, err := pstorage.NewRegisters(pdb)
registers, err := pstorage.NewRegisters(pdb, builder.registerDBPruneThreshold)
if err != nil {
return nil, fmt.Errorf("could not create registers storage: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion engine/access/rpc/backend/script_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (s *ScriptExecutorSuite) SetupTest() {

s.dbDir = unittest.TempDir(s.T())
db := pebbleStorage.NewBootstrappedRegistersWithPathForTest(s.T(), s.dbDir, s.height, s.height)
pebbleRegisters, err := pebbleStorage.NewRegisters(db)
pebbleRegisters, err := pebbleStorage.NewRegisters(db, pebbleStorage.PruningDisabled)
s.Require().NoError(err)
s.registerIndex = pebbleRegisters

Expand Down
2 changes: 1 addition & 1 deletion engine/testutil/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func ExecutionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, ide
checkpointHeight := uint64(0)
require.NoError(t, esbootstrap.ImportRegistersFromCheckpoint(node.Log, checkpointFile, checkpointHeight, matchTrie.RootHash(), pebbledb, 2))

diskStore, err := storagepebble.NewRegisters(pebbledb)
diskStore, err := storagepebble.NewRegisters(pebbledb, storagepebble.PruningDisabled)
require.NoError(t, err)

reader := finalizedreader.NewFinalizedReader(headersStorage, checkpointHeight)
Expand Down
2 changes: 1 addition & 1 deletion module/execution/scripts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (s *scriptTestSuite) SetupTest() {

s.dbDir = unittest.TempDir(s.T())
db := pebbleStorage.NewBootstrappedRegistersWithPathForTest(s.T(), s.dbDir, s.height, s.height)
pebbleRegisters, err := pebbleStorage.NewRegisters(db)
pebbleRegisters, err := pebbleStorage.NewRegisters(db, pebbleStorage.PruningDisabled)
s.Require().NoError(err)
s.registerIndex = pebbleRegisters

Expand Down
6 changes: 3 additions & 3 deletions storage/pebble/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_Happy(t *testing.T) {
require.NoError(t, err)

// create registers instance and check values
reg, err := NewRegisters(pb)
reg, err := NewRegisters(pb, PruningDisabled)
require.NoError(t, err)

require.Equal(t, reg.LatestHeight(), rootHeight)
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_Empty(t *testing.T) {
require.NoError(t, err)

// create registers instance and check values
reg, err := NewRegisters(pb)
reg, err := NewRegisters(pb, PruningDisabled)
require.NoError(t, err)

require.Equal(t, reg.LatestHeight(), rootHeight)
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestRegisterBootstrap_IndexCheckpointFile_MultipleBatch(t *testing.T) {
require.NoError(t, err)

// create registers instance and check values
reg, err := NewRegisters(pb)
reg, err := NewRegisters(pb, PruningDisabled)
require.NoError(t, err)

require.Equal(t, reg.LatestHeight(), rootHeight)
Expand Down
2 changes: 1 addition & 1 deletion storage/pebble/open.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewBootstrappedRegistersWithPath(dir string) (*Registers, *pebble.DB, error
if err != nil {
return nil, nil, fmt.Errorf("failed to initialize pebble db: %w", err)
}
registers, err := NewRegisters(db)
registers, err := NewRegisters(db, PruningDisabled)
if err != nil {
if errors.Is(err, storage.ErrNotBootstrapped) {
// closing the db if not bootstrapped
Expand Down
2 changes: 1 addition & 1 deletion storage/pebble/open_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestNewBootstrappedRegistersWithPath(t *testing.T) {
firstHeight := uint64(10)
require.NoError(t, initHeights(db2, firstHeight))

registers, err := NewRegisters(db2)
registers, err := NewRegisters(db2, PruningDisabled)
require.NoError(t, err)
require.Equal(t, firstHeight, registers.FirstHeight())
require.Equal(t, firstHeight, registers.LatestHeight())
Expand Down
65 changes: 51 additions & 14 deletions storage/pebble/registers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package pebble
import (
"encoding/binary"
"fmt"
"math"

"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
Expand All @@ -15,29 +16,39 @@ import (
// Registers library that implements pebble storage for registers
// given a pebble instance with root block and root height populated
type Registers struct {
db *pebble.DB
firstHeight uint64
latestHeight *atomic.Uint64
db *pebble.DB
firstHeight uint64
latestHeight *atomic.Uint64
pruneThreshold uint64
}

// PruningDisabled represents the absence of a pruning threshold.
const PruningDisabled = math.MaxUint64

var _ storage.RegisterIndex = (*Registers)(nil)

// NewRegisters takes a populated pebble instance with LatestHeight and FirstHeight set.
// return storage.ErrNotBootstrapped if they those two keys are unavailable as it implies a uninitialized state
// return other error if database is in a corrupted state
func NewRegisters(db *pebble.DB) (*Registers, error) {
func NewRegisters(db *pebble.DB, pruneThreshold uint64) (*Registers, error) {
// check height keys and populate cache. These two variables will have been set
firstHeight, latestHeight, err := ReadHeightsFromBootstrappedDB(db)
if err != nil {
// first height is found, but latest height is not found, this means that the DB is in a corrupted state
return nil, fmt.Errorf("unable to initialize register storage, latest height unavailable in db: %w", err)
}

// If no pruning threshold is provided, disable pruning.
if pruneThreshold == 0 {
pruneThreshold = PruningDisabled
}

// All registers between firstHeight and lastHeight have been indexed
return &Registers{
db: db,
firstHeight: firstHeight,
latestHeight: atomic.NewUint64(latestHeight),
db: db,
firstHeight: firstHeight,
latestHeight: atomic.NewUint64(latestHeight),
pruneThreshold: pruneThreshold,
}, nil
}

Expand All @@ -53,12 +64,14 @@ func (s *Registers) Get(
reg flow.RegisterID,
height uint64,
) (flow.RegisterValue, error) {
latestHeight := s.latestHeight.Load()
if height > latestHeight || height < s.firstHeight {
return nil, errors.Wrap(
storage.ErrHeightNotIndexed,
fmt.Sprintf("height %d not indexed, indexed range is [%d-%d]", height, s.firstHeight, latestHeight),
)
latestHeight := s.LatestHeight()
if height > latestHeight {
return nil, fmt.Errorf("height %d not indexed, latestHeight: %d, %w", height, latestHeight, storage.ErrHeightNotIndexed)
}

firstHeight := s.calculateFirstHeight(latestHeight)
if height < firstHeight {
return nil, fmt.Errorf("height %d not indexed, indexed range: [%d-%d], %w", height, firstHeight, latestHeight, storage.ErrHeightNotIndexed)
}
key := newLookupKey(height, reg)
return s.lookupRegister(key.Bytes())
Expand Down Expand Up @@ -132,6 +145,7 @@ func (s *Registers) Store(
if err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}

s.latestHeight.Store(height)

return nil
Expand All @@ -144,7 +158,30 @@ func (s *Registers) LatestHeight() uint64 {

// FirstHeight first indexed height found in the store, typically root block for the spork
func (s *Registers) FirstHeight() uint64 {
return s.firstHeight
return s.calculateFirstHeight(s.LatestHeight())
}

// calculateFirstHeight calculates the first indexed height that is stored in the register index, based on the
// latest height and the configured pruning threshold. If the latest height is below the pruning threshold, the
// first indexed height will be the same as the initial height when the store was initialized. If the pruning
// threshold has been exceeded, the first indexed height is adjusted accordingly.
//
// Parameters:
// - latestHeight: the most recent height of complete registers available.
//
// Returns:
// - The first indexed height, either as the initialized height or adjusted for pruning.
func (s *Registers) calculateFirstHeight(latestHeight uint64) uint64 {
if latestHeight < s.pruneThreshold {
return s.firstHeight
}

pruneHeight := latestHeight - s.pruneThreshold
if pruneHeight < s.firstHeight {
return s.firstHeight
}

return pruneHeight
}

func firstStoredHeight(db *pebble.DB) (uint64, error) {
Expand Down
4 changes: 2 additions & 2 deletions storage/pebble/registers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestRegisters_Initialize(t *testing.T) {
t.Parallel()
p, dir := unittest.TempPebbleDBWithOpts(t, nil)
// fail on blank database without FirstHeight and LastHeight set
_, err := NewRegisters(p)
_, err := NewRegisters(p, PruningDisabled)
require.Error(t, err)
// verify the error type
require.True(t, errors.Is(err, storage.ErrNotBootstrapped))
Expand Down Expand Up @@ -296,7 +296,7 @@ func Benchmark_PayloadStorage(b *testing.B) {
dbpath := path.Join(b.TempDir(), "benchmark1.db")
db, err := pebble.Open(dbpath, opts)
require.NoError(b, err)
s, err := NewRegisters(db)
s, err := NewRegisters(db, PruningDisabled)
require.NoError(b, err)
require.NotNil(b, s)

Expand Down
2 changes: 1 addition & 1 deletion storage/pebble/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
func RunWithRegistersStorageAtInitialHeights(tb testing.TB, first uint64, latest uint64, f func(r *Registers)) {
unittest.RunWithTempDir(tb, func(dir string) {
db := NewBootstrappedRegistersWithPathForTest(tb, dir, first, latest)
r, err := NewRegisters(db)
r, err := NewRegisters(db, PruningDisabled)
require.NoError(tb, err)

f(r)
Expand Down

0 comments on commit 3602aeb

Please sign in to comment.