From eb6a305a575157e5f27000cf3f44034c399a542e Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Fri, 30 Aug 2024 08:39:57 +0200 Subject: [PATCH 1/5] test --- block/block.go | 8 ++++---- block/manager.go | 4 ++++ block/pruning.go | 22 +++++++++++++++++++--- block/pruning_test.go | 2 +- 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/block/block.go b/block/block.go index 9af9e5c11..d7c575c4d 100644 --- a/block/block.go +++ b/block/block.go @@ -97,11 +97,11 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta types.RollappHeightGauge.Set(float64(block.Header.Height)) // Prune old heights, if requested by ABCI app. + // retainHeight is determined by currentHeight - min-retain-blocks (app.toml config). + // Unless max_age_num_blocks in consensus params is higher than min-retain-block, then max_age_num_blocks will be used instead of min-retain-blocks. + if 0 < retainHeight { - err = m.PruneBlocks(uint64(retainHeight)) - if err != nil { - m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err) - } + m.pruningC <- retainHeight } m.blockCache.Delete(block.Header.Height) diff --git a/block/manager.go b/block/manager.go index 1b68d7261..61de29272 100644 --- a/block/manager.go +++ b/block/manager.go @@ -77,6 +77,9 @@ type Manager struct { // TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA TargetHeight atomic.Uint64 + + // channel used to send the retain height to the pruning background loop + pruningC chan int64 } // NewManager creates new block Manager. @@ -116,6 +119,7 @@ func NewManager( blockCache: &Cache{ cache: make(map[uint64]types.CachedBlock), }, + pruningC: make(chan int64), } err = m.LoadStateOnInit(store, genesis, logger) diff --git a/block/pruning.go b/block/pruning.go index daa3b3dcf..1e1b10301 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -7,21 +7,25 @@ import ( "github.com/dymensionxyz/gerr-cosmos/gerrc" ) -func (m *Manager) PruneBlocks(retainHeight uint64) error { +func (m *Manager) pruneBlocks(retainHeight uint64) error { if m.IsProposer() && m.NextHeightToSubmit() < retainHeight { // do not delete anything that we might submit in future - return fmt.Errorf("cannot prune blocks before they have been submitted: retain height %d: next height to submit: %d: %w", + m.logger.Error("skipping block pruning. next height to submit is previous to retain_height.", "retain_height", retainHeight, "next_submit_height", m.NextHeightToSubmit()) + return fmt.Errorf("skipping block pruning. next height to submit is previous to retain_height.t %d: next height to submit: %d: %w", retainHeight, m.NextHeightToSubmit(), gerrc.ErrInvalidArgument) } + // err := m.P2PClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight) if err != nil { m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err) } + pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight) if err != nil { - return fmt.Errorf("prune block store: %w", err) + m.logger.Error("pruning dymint store", "retain_height", retainHeight, "err", err) + return fmt.Errorf("pruning dymint store: %w", err) } // TODO: prune state/indexer and state/txindexer?? @@ -29,9 +33,21 @@ func (m *Manager) PruneBlocks(retainHeight uint64) error { m.State.BaseHeight = retainHeight _, err = m.Store.SaveState(m.State, nil) if err != nil { + m.logger.Error("saving state.", "retain_height", retainHeight, "err", err) return fmt.Errorf("save state: %w", err) } m.logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight) return nil } + +func (m *Manager) PruningLoop(ctx context.Context) error { + + for { + select { + case <-ctx.Done(): + return ctx.Err() + } + } + return nil +} diff --git a/block/pruning_test.go b/block/pruning_test.go index b3f81e9bb..ef0f11135 100644 --- a/block/pruning_test.go +++ b/block/pruning_test.go @@ -67,6 +67,6 @@ func TestPruningRetainHeight(t *testing.T) { require.Error(err) // cannot prune blocks before they have been submitted } - err = manager.PruneBlocks(validRetainHeight) + err = manager.pruneBlocks(validRetainHeight) require.NoError(err) } From c3c68684228f927d5a94c7cf4bb81fb21ec29e7c Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Fri, 30 Aug 2024 08:42:44 +0200 Subject: [PATCH 2/5] test --- block/pruning.go | 1 + block/pruning_test.go | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/block/pruning.go b/block/pruning.go index 1e1b10301..8fb57fef8 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -48,6 +48,7 @@ func (m *Manager) PruningLoop(ctx context.Context) error { case <-ctx.Done(): return ctx.Err() } + case retainHeight <- } return nil } diff --git a/block/pruning_test.go b/block/pruning_test.go index ef0f11135..b3f81e9bb 100644 --- a/block/pruning_test.go +++ b/block/pruning_test.go @@ -67,6 +67,6 @@ func TestPruningRetainHeight(t *testing.T) { require.Error(err) // cannot prune blocks before they have been submitted } - err = manager.pruneBlocks(validRetainHeight) + err = manager.PruneBlocks(validRetainHeight) require.NoError(err) } From 571ce8874833df2629f05e3ddb3727df73cdb29c Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Fri, 30 Aug 2024 10:35:41 +0200 Subject: [PATCH 3/5] prune loop added --- block/block.go | 6 +++++- block/manager.go | 2 +- block/pruning.go | 9 ++++++--- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/block/block.go b/block/block.go index d7c575c4d..e717c2d61 100644 --- a/block/block.go +++ b/block/block.go @@ -101,7 +101,11 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta // Unless max_age_num_blocks in consensus params is higher than min-retain-block, then max_age_num_blocks will be used instead of min-retain-blocks. if 0 < retainHeight { - m.pruningC <- retainHeight + select { + case m.pruningC <- retainHeight: + default: + m.logger.Error("pruning channel full. skipping pruning", "retainHeight", retainHeight) + } } m.blockCache.Delete(block.Header.Height) diff --git a/block/manager.go b/block/manager.go index 61de29272..3891ee981 100644 --- a/block/manager.go +++ b/block/manager.go @@ -119,7 +119,7 @@ func NewManager( blockCache: &Cache{ cache: make(map[uint64]types.CachedBlock), }, - pruningC: make(chan int64), + pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration. } err = m.LoadStateOnInit(store, genesis, logger) diff --git a/block/pruning.go b/block/pruning.go index 8fb57fef8..aefe07b4b 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -7,7 +7,7 @@ import ( "github.com/dymensionxyz/gerr-cosmos/gerrc" ) -func (m *Manager) pruneBlocks(retainHeight uint64) error { +func (m *Manager) PruneBlocks(retainHeight uint64) error { if m.IsProposer() && m.NextHeightToSubmit() < retainHeight { // do not delete anything that we might submit in future m.logger.Error("skipping block pruning. next height to submit is previous to retain_height.", "retain_height", retainHeight, "next_submit_height", m.NextHeightToSubmit()) return fmt.Errorf("skipping block pruning. next height to submit is previous to retain_height.t %d: next height to submit: %d: %w", @@ -47,8 +47,11 @@ func (m *Manager) PruningLoop(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() + case retainHeight := <-m.pruningC: + err := m.PruneBlocks(uint64(retainHeight)) + if err != nil { + m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err) + } } - case retainHeight <- } - return nil } From b717af6e496899831b4483968bc91fb039f962b4 Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Fri, 30 Aug 2024 10:37:55 +0200 Subject: [PATCH 4/5] add pruneloop call manager --- block/manager.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/block/manager.go b/block/manager.go index 3891ee981..05bba9035 100644 --- a/block/manager.go +++ b/block/manager.go @@ -162,6 +162,11 @@ func (m *Manager) Start(ctx context.Context) error { isProposer := m.IsProposer() m.logger.Info("starting block manager", "proposer", isProposer) + eg, ctx := errgroup.WithContext(ctx) + uerrors.ErrGroupGoLog(eg, m.logger, func() error { + return m.PruningLoop(ctx) + }) + /* ----------------------------- full node mode ----------------------------- */ if !isProposer { // Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel. @@ -204,7 +209,6 @@ func (m *Manager) Start(ctx context.Context) error { // channel to signal sequencer rotation started rotateSequencerC := make(chan string, 1) - eg, ctx := errgroup.WithContext(ctx) uerrors.ErrGroupGoLog(eg, m.logger, func() error { return m.SubmitLoop(ctx, bytesProducedC) }) From 516c82d68039eb76c5a935b15486792506e8e63c Mon Sep 17 00:00:00 2001 From: Sergi Rene Date: Fri, 30 Aug 2024 11:31:53 +0200 Subject: [PATCH 5/5] lint fix --- block/pruning.go | 1 - 1 file changed, 1 deletion(-) diff --git a/block/pruning.go b/block/pruning.go index aefe07b4b..7fa49cae8 100644 --- a/block/pruning.go +++ b/block/pruning.go @@ -42,7 +42,6 @@ func (m *Manager) PruneBlocks(retainHeight uint64) error { } func (m *Manager) PruningLoop(ctx context.Context) error { - for { select { case <-ctx.Done():