Skip to content

Commit

Permalink
Updated according to comments, added additional test for tracker storage
Browse files Browse the repository at this point in the history
  • Loading branch information
UlyanaAndrukhiv committed Jun 25, 2024
1 parent 473d2f5 commit fa5fc2e
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 28 deletions.
20 changes: 11 additions & 9 deletions cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,6 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu
}

func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder {
var ds *badger.Datastore
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
Expand All @@ -537,13 +536,12 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return err
}

ds, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions)
builder.ExecutionDataDatastore, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions)
if err != nil {
return err
}
builder.ExecutionDataDatastore = ds
builder.ShutdownFunc(func() error {
if err := ds.Close(); err != nil {
if err := builder.ExecutionDataDatastore.Close(); err != nil {
return fmt.Errorf("could not close execution data datastore: %w", err)
}
return nil
Expand Down Expand Up @@ -645,8 +643,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
}

downloaderOpts = []execution_data.DownloaderOption{
execution_data.WithExecutionDataTracker(builder.ExecutionDataTracker),
execution_data.WithHeaders(node.Storage.Headers),
execution_data.WithExecutionDataTracker(builder.ExecutionDataTracker, node.Storage.Headers),
}
}

Expand Down Expand Up @@ -723,7 +720,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess
return builder.ExecutionDataRequester, nil
}).
Component("execution data pruner", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// by default, pruning is disabled
if builder.executionDataPrunerHeightRangeTarget == 0 {
return &module.NoopReadyDoneAware{}, nil
}
Expand Down Expand Up @@ -1262,8 +1258,14 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
"execution-data-max-retry-delay",
defaultConfig.executionDataConfig.MaxRetryDelay,
"maximum delay for exponential backoff when fetching execution data fails e.g. 5m")
flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, "execution-data-height-range-target", defaultConfig.executionDataPrunerHeightRangeTarget, "target height range size used to limit the amount of Execution Data kept on disk")
flags.Uint64Var(&builder.executionDataPrunerThreshold, "execution-data-height-range-threshold", defaultConfig.executionDataPrunerThreshold, "height threshold used to trigger Execution Data pruning")
flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget,
"execution-data-height-range-target",
defaultConfig.executionDataPrunerHeightRangeTarget,
"number of blocks of Execution Data to keep on disk. older data is pruned")
flags.Uint64Var(&builder.executionDataPrunerThreshold,
"execution-data-height-range-threshold",
defaultConfig.executionDataPrunerThreshold,
"number of unpruned blocks of Execution Data beyond the height range target to allow before pruning")

// Execution State Streaming API
flags.Uint32Var(&builder.stateStreamConf.ExecutionDataCacheSize, "execution-data-cache-size", defaultConfig.stateStreamConf.ExecutionDataCacheSize, "block execution data cache size")
Expand Down
20 changes: 11 additions & 9 deletions cmd/observer/node_builder/observer_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,8 +669,14 @@ func (builder *ObserverServiceBuilder) extraFlags() {
flags.StringVar(&builder.checkpointFile, "execution-state-checkpoint", defaultConfig.checkpointFile, "execution-state checkpoint file")

// Execution data pruner
flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, "execution-data-height-range-target", defaultConfig.executionDataPrunerHeightRangeTarget, "target height range size used to limit the amount of Execution Data kept on disk")
flags.Uint64Var(&builder.executionDataPrunerThreshold, "execution-data-height-range-threshold", defaultConfig.executionDataPrunerThreshold, "height threshold used to trigger Execution Data pruning")
flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget,
"execution-data-height-range-target",
defaultConfig.executionDataPrunerHeightRangeTarget,
"number of blocks of Execution Data to keep on disk. older data is pruned")
flags.Uint64Var(&builder.executionDataPrunerThreshold,
"execution-data-height-range-threshold",
defaultConfig.executionDataPrunerThreshold,
"number of unpruned blocks of Execution Data beyond the height range target to allow before pruning")

// ExecutionDataRequester config
flags.BoolVar(&builder.executionDataSyncEnabled,
Expand Down Expand Up @@ -1074,7 +1080,6 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) {
}

func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder {
var ds *badger.Datastore
var bs network.BlobService
var processedBlockHeight storage.ConsumerProgress
var processedNotifications storage.ConsumerProgress
Expand All @@ -1098,14 +1103,13 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return err
}

ds, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions)
builder.ExecutionDataDatastore, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions)
if err != nil {
return err
}

builder.ExecutionDataDatastore = ds
builder.ShutdownFunc(func() error {
if err := ds.Close(); err != nil {
if err := builder.ExecutionDataDatastore.Close(); err != nil {
return fmt.Errorf("could not close execution data datastore: %w", err)
}
return nil
Expand Down Expand Up @@ -1199,8 +1203,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
}

downloaderOpts = []execution_data.DownloaderOption{
execution_data.WithExecutionDataTracker(builder.ExecutionDataTracker),
execution_data.WithHeaders(node.Storage.Headers),
execution_data.WithExecutionDataTracker(builder.ExecutionDataTracker, node.Storage.Headers),
}
}

Expand Down Expand Up @@ -1278,7 +1281,6 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS
return builder.ExecutionDataRequester, nil
}).
Component("execution data pruner", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) {
// by default, pruning is disabled
if builder.executionDataPrunerHeightRangeTarget == 0 {
return &module.NoopReadyDoneAware{}, nil
}
Expand Down
10 changes: 2 additions & 8 deletions module/executiondatasync/execution_data/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,10 @@ func WithSerializer(serializer Serializer) DownloaderOption {
}
}

// WithExecutionDataTracker configures the execution data tracker for the downloader
func WithExecutionDataTracker(storage tracker.Storage) DownloaderOption {
// WithExecutionDataTracker configures the execution data tracker and the storage headers for the downloader
func WithExecutionDataTracker(storage tracker.Storage, headers storage.Headers) DownloaderOption {
return func(d *downloader) {
d.storage = storage
}
}

// WithHeaders configures the storage headers for the downloader
func WithHeaders(headers storage.Headers) DownloaderOption {
return func(d *downloader) {
d.headers = headers
}
}
Expand Down
4 changes: 2 additions & 2 deletions module/executiondatasync/tracker/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func makeLatestHeightKey(c cid.Cid) []byte {

func makeUint64Value(v uint64) []byte {
value := make([]byte, 8)
binary.LittleEndian.PutUint64(value, v)
binary.BigEndian.PutUint64(value, v)
return value
}

Expand All @@ -84,7 +84,7 @@ func getUint64Value(item *badger.Item) (uint64, error) {
return 0, err
}

return binary.LittleEndian.Uint64(value), nil
return binary.BigEndian.Uint64(value), nil
}

// getBatchItemCountLimit returns the maximum number of items that can be included in a single batch
Expand Down
63 changes: 63 additions & 0 deletions module/executiondatasync/tracker/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,66 @@ func TestPruneNonLatestHeight(t *testing.T) {
})
require.NoError(t, err)
}

// TestAscendingOrderOfRecords tests that order of data is ascending and all CIDs appearing at or below the pruned
// height, and their associated tracking data, should be removed from the database.
func TestAscendingOrderOfRecords(t *testing.T) {
expectedPrunedCIDs := make(map[cid.Cid]struct{})
storageDir := t.TempDir()
storage, err := OpenStorage(storageDir, 0, zerolog.Nop(), WithPruneCallback(func(c cid.Cid) error {
_, ok := expectedPrunedCIDs[c]
assert.True(t, ok, "unexpected CID pruned: %s", c.String())
delete(expectedPrunedCIDs, c)
return nil
}))
require.NoError(t, err)

// c1 is for height 1,
// c2 is for height 2,
// c3 is for height 256
// pruning up to height 1 will check if order of the records is ascending, c1 should be pruned
c1 := randomCid()
expectedPrunedCIDs[c1] = struct{}{}
c2 := randomCid()
c3 := randomCid()

require.NoError(t, storage.Update(func(tbf TrackBlobsFn) error {
require.NoError(t, tbf(1, c1))
require.NoError(t, tbf(2, c2))
// It is important to check if the record with height 256 does not precede
// the record with height 1 during pruning.
require.NoError(t, tbf(256, c3))

return nil
}))
require.NoError(t, storage.PruneUpToHeight(1))

prunedHeight, err := storage.GetPrunedHeight()
require.NoError(t, err)
assert.Equal(t, uint64(1), prunedHeight)

assert.Len(t, expectedPrunedCIDs, 0)

err = storage.db.View(func(txn *badger.Txn) error {
// expected that blob record with height 1 was removed
_, err := txn.Get(makeBlobRecordKey(1, c1))
assert.ErrorIs(t, err, badger.ErrKeyNotFound)
_, err = txn.Get(makeLatestHeightKey(c1))
assert.ErrorIs(t, err, badger.ErrKeyNotFound)

// expected that blob record with height 2 exists
_, err = txn.Get(makeBlobRecordKey(2, c2))
assert.NoError(t, err)
_, err = txn.Get(makeLatestHeightKey(c2))
assert.NoError(t, err)

// expected that blob record with height 256 exists
_, err = txn.Get(makeBlobRecordKey(256, c3))
assert.NoError(t, err)
_, err = txn.Get(makeLatestHeightKey(c3))
assert.NoError(t, err)

return nil
})
require.NoError(t, err)
}

0 comments on commit fa5fc2e

Please sign in to comment.