From fa5fc2ef02cb1f002a1697d8540c996f76b08dc3 Mon Sep 17 00:00:00 2001 From: UlyanaAndrukhiv Date: Mon, 24 Jun 2024 17:33:04 +0300 Subject: [PATCH] Updated according to comments, added additional test for tracker storage --- .../node_builder/access_node_builder.go | 20 +++--- cmd/observer/node_builder/observer_builder.go | 20 +++--- .../execution_data/downloader.go | 10 +-- module/executiondatasync/tracker/storage.go | 4 +- .../executiondatasync/tracker/storage_test.go | 63 +++++++++++++++++++ 5 files changed, 89 insertions(+), 28 deletions(-) diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index efb8bf86301..6ec79994829 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -513,7 +513,6 @@ func (builder *FlowAccessNodeBuilder) BuildConsensusFollower() *FlowAccessNodeBu } func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccessNodeBuilder { - var ds *badger.Datastore var bs network.BlobService var processedBlockHeight storage.ConsumerProgress var processedNotifications storage.ConsumerProgress @@ -537,13 +536,12 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess return err } - ds, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions) + builder.ExecutionDataDatastore, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions) if err != nil { return err } - builder.ExecutionDataDatastore = ds builder.ShutdownFunc(func() error { - if err := ds.Close(); err != nil { + if err := builder.ExecutionDataDatastore.Close(); err != nil { return fmt.Errorf("could not close execution data datastore: %w", err) } return nil @@ -645,8 +643,7 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess } downloaderOpts = []execution_data.DownloaderOption{ - execution_data.WithExecutionDataTracker(builder.ExecutionDataTracker), - execution_data.WithHeaders(node.Storage.Headers), + execution_data.WithExecutionDataTracker(builder.ExecutionDataTracker, node.Storage.Headers), } } @@ -723,7 +720,6 @@ func (builder *FlowAccessNodeBuilder) BuildExecutionSyncComponents() *FlowAccess return builder.ExecutionDataRequester, nil }). Component("execution data pruner", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { - // by default, pruning is disabled if builder.executionDataPrunerHeightRangeTarget == 0 { return &module.NoopReadyDoneAware{}, nil } @@ -1262,8 +1258,14 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { "execution-data-max-retry-delay", defaultConfig.executionDataConfig.MaxRetryDelay, "maximum delay for exponential backoff when fetching execution data fails e.g. 5m") - flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, "execution-data-height-range-target", defaultConfig.executionDataPrunerHeightRangeTarget, "target height range size used to limit the amount of Execution Data kept on disk") - flags.Uint64Var(&builder.executionDataPrunerThreshold, "execution-data-height-range-threshold", defaultConfig.executionDataPrunerThreshold, "height threshold used to trigger Execution Data pruning") + flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, + "execution-data-height-range-target", + defaultConfig.executionDataPrunerHeightRangeTarget, + "number of blocks of Execution Data to keep on disk. older data is pruned") + flags.Uint64Var(&builder.executionDataPrunerThreshold, + "execution-data-height-range-threshold", + defaultConfig.executionDataPrunerThreshold, + "number of unpruned blocks of Execution Data beyond the height range target to allow before pruning") // Execution State Streaming API flags.Uint32Var(&builder.stateStreamConf.ExecutionDataCacheSize, "execution-data-cache-size", defaultConfig.stateStreamConf.ExecutionDataCacheSize, "block execution data cache size") diff --git a/cmd/observer/node_builder/observer_builder.go b/cmd/observer/node_builder/observer_builder.go index c64d95e026a..d7eae4c133b 100644 --- a/cmd/observer/node_builder/observer_builder.go +++ b/cmd/observer/node_builder/observer_builder.go @@ -669,8 +669,14 @@ func (builder *ObserverServiceBuilder) extraFlags() { flags.StringVar(&builder.checkpointFile, "execution-state-checkpoint", defaultConfig.checkpointFile, "execution-state checkpoint file") // Execution data pruner - flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, "execution-data-height-range-target", defaultConfig.executionDataPrunerHeightRangeTarget, "target height range size used to limit the amount of Execution Data kept on disk") - flags.Uint64Var(&builder.executionDataPrunerThreshold, "execution-data-height-range-threshold", defaultConfig.executionDataPrunerThreshold, "height threshold used to trigger Execution Data pruning") + flags.Uint64Var(&builder.executionDataPrunerHeightRangeTarget, + "execution-data-height-range-target", + defaultConfig.executionDataPrunerHeightRangeTarget, + "number of blocks of Execution Data to keep on disk. older data is pruned") + flags.Uint64Var(&builder.executionDataPrunerThreshold, + "execution-data-height-range-threshold", + defaultConfig.executionDataPrunerThreshold, + "number of unpruned blocks of Execution Data beyond the height range target to allow before pruning") // ExecutionDataRequester config flags.BoolVar(&builder.executionDataSyncEnabled, @@ -1074,7 +1080,6 @@ func (builder *ObserverServiceBuilder) Build() (cmd.Node, error) { } func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverServiceBuilder { - var ds *badger.Datastore var bs network.BlobService var processedBlockHeight storage.ConsumerProgress var processedNotifications storage.ConsumerProgress @@ -1098,14 +1103,13 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS return err } - ds, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions) + builder.ExecutionDataDatastore, err = badger.NewDatastore(datastoreDir, &badger.DefaultOptions) if err != nil { return err } - builder.ExecutionDataDatastore = ds builder.ShutdownFunc(func() error { - if err := ds.Close(); err != nil { + if err := builder.ExecutionDataDatastore.Close(); err != nil { return fmt.Errorf("could not close execution data datastore: %w", err) } return nil @@ -1199,8 +1203,7 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS } downloaderOpts = []execution_data.DownloaderOption{ - execution_data.WithExecutionDataTracker(builder.ExecutionDataTracker), - execution_data.WithHeaders(node.Storage.Headers), + execution_data.WithExecutionDataTracker(builder.ExecutionDataTracker, node.Storage.Headers), } } @@ -1278,7 +1281,6 @@ func (builder *ObserverServiceBuilder) BuildExecutionSyncComponents() *ObserverS return builder.ExecutionDataRequester, nil }). Component("execution data pruner", func(node *cmd.NodeConfig) (module.ReadyDoneAware, error) { - // by default, pruning is disabled if builder.executionDataPrunerHeightRangeTarget == 0 { return &module.NoopReadyDoneAware{}, nil } diff --git a/module/executiondatasync/execution_data/downloader.go b/module/executiondatasync/execution_data/downloader.go index 3d9f7332639..8ae1a000d1c 100644 --- a/module/executiondatasync/execution_data/downloader.go +++ b/module/executiondatasync/execution_data/downloader.go @@ -42,16 +42,10 @@ func WithSerializer(serializer Serializer) DownloaderOption { } } -// WithExecutionDataTracker configures the execution data tracker for the downloader -func WithExecutionDataTracker(storage tracker.Storage) DownloaderOption { +// WithExecutionDataTracker configures the execution data tracker and the storage headers for the downloader +func WithExecutionDataTracker(storage tracker.Storage, headers storage.Headers) DownloaderOption { return func(d *downloader) { d.storage = storage - } -} - -// WithHeaders configures the storage headers for the downloader -func WithHeaders(headers storage.Headers) DownloaderOption { - return func(d *downloader) { d.headers = headers } } diff --git a/module/executiondatasync/tracker/storage.go b/module/executiondatasync/tracker/storage.go index 8aa879d4e1a..c7677f79ca7 100644 --- a/module/executiondatasync/tracker/storage.go +++ b/module/executiondatasync/tracker/storage.go @@ -74,7 +74,7 @@ func makeLatestHeightKey(c cid.Cid) []byte { func makeUint64Value(v uint64) []byte { value := make([]byte, 8) - binary.LittleEndian.PutUint64(value, v) + binary.BigEndian.PutUint64(value, v) return value } @@ -84,7 +84,7 @@ func getUint64Value(item *badger.Item) (uint64, error) { return 0, err } - return binary.LittleEndian.Uint64(value), nil + return binary.BigEndian.Uint64(value), nil } // getBatchItemCountLimit returns the maximum number of items that can be included in a single batch diff --git a/module/executiondatasync/tracker/storage_test.go b/module/executiondatasync/tracker/storage_test.go index b0078065642..76c7b613ab6 100644 --- a/module/executiondatasync/tracker/storage_test.go +++ b/module/executiondatasync/tracker/storage_test.go @@ -120,3 +120,66 @@ func TestPruneNonLatestHeight(t *testing.T) { }) require.NoError(t, err) } + +// TestAscendingOrderOfRecords tests that order of data is ascending and all CIDs appearing at or below the pruned +// height, and their associated tracking data, should be removed from the database. +func TestAscendingOrderOfRecords(t *testing.T) { + expectedPrunedCIDs := make(map[cid.Cid]struct{}) + storageDir := t.TempDir() + storage, err := OpenStorage(storageDir, 0, zerolog.Nop(), WithPruneCallback(func(c cid.Cid) error { + _, ok := expectedPrunedCIDs[c] + assert.True(t, ok, "unexpected CID pruned: %s", c.String()) + delete(expectedPrunedCIDs, c) + return nil + })) + require.NoError(t, err) + + // c1 is for height 1, + // c2 is for height 2, + // c3 is for height 256 + // pruning up to height 1 will check if order of the records is ascending, c1 should be pruned + c1 := randomCid() + expectedPrunedCIDs[c1] = struct{}{} + c2 := randomCid() + c3 := randomCid() + + require.NoError(t, storage.Update(func(tbf TrackBlobsFn) error { + require.NoError(t, tbf(1, c1)) + require.NoError(t, tbf(2, c2)) + // It is important to check if the record with height 256 does not precede + // the record with height 1 during pruning. + require.NoError(t, tbf(256, c3)) + + return nil + })) + require.NoError(t, storage.PruneUpToHeight(1)) + + prunedHeight, err := storage.GetPrunedHeight() + require.NoError(t, err) + assert.Equal(t, uint64(1), prunedHeight) + + assert.Len(t, expectedPrunedCIDs, 0) + + err = storage.db.View(func(txn *badger.Txn) error { + // expected that blob record with height 1 was removed + _, err := txn.Get(makeBlobRecordKey(1, c1)) + assert.ErrorIs(t, err, badger.ErrKeyNotFound) + _, err = txn.Get(makeLatestHeightKey(c1)) + assert.ErrorIs(t, err, badger.ErrKeyNotFound) + + // expected that blob record with height 2 exists + _, err = txn.Get(makeBlobRecordKey(2, c2)) + assert.NoError(t, err) + _, err = txn.Get(makeLatestHeightKey(c2)) + assert.NoError(t, err) + + // expected that blob record with height 256 exists + _, err = txn.Get(makeBlobRecordKey(256, c3)) + assert.NoError(t, err) + _, err = txn.Get(makeLatestHeightKey(c3)) + assert.NoError(t, err) + + return nil + }) + require.NoError(t, err) +}