Skip to content

Commit

Permalink
Merge branch 'srene/1038-pruning-bg' into srene/rolapp_params_refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
srene committed Aug 30, 2024
2 parents 304d68a + 516c82d commit 829ef48
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 6 deletions.
10 changes: 7 additions & 3 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,14 @@ func (m *Manager) applyBlock(block *types.Block, commit *types.Commit, blockMeta
types.RollappHeightGauge.Set(float64(block.Header.Height))

// Prune old heights, if requested by ABCI app.
// retainHeight is determined by currentHeight - min-retain-blocks (app.toml config).
// Unless max_age_num_blocks in consensus params is higher than min-retain-block, then max_age_num_blocks will be used instead of min-retain-blocks.

if 0 < retainHeight {
err = m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("prune blocks", "retain_height", retainHeight, "err", err)
select {
case m.pruningC <- retainHeight:
default:
m.logger.Error("pruning channel full. skipping pruning", "retainHeight", retainHeight)
}
}

Expand Down
10 changes: 9 additions & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type Manager struct {

// TargetHeight holds the value of the current highest block seen from either p2p (probably higher) or the DA
TargetHeight atomic.Uint64

// channel used to send the retain height to the pruning background loop
pruningC chan int64
}

// NewManager creates new block Manager.
Expand Down Expand Up @@ -116,6 +119,7 @@ func NewManager(
blockCache: &Cache{
cache: make(map[uint64]types.CachedBlock),
},
pruningC: make(chan int64, 10), // use of buffered channel to avoid blocking applyBlock thread. In case channel is full, pruning will be skipped, but the retain height can be pruned in the next iteration.
}

err = m.LoadStateOnInit(store, genesis, logger)
Expand Down Expand Up @@ -158,6 +162,11 @@ func (m *Manager) Start(ctx context.Context) error {
isProposer := m.IsProposer()
m.logger.Info("starting block manager", "proposer", isProposer)

eg, ctx := errgroup.WithContext(ctx)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.PruningLoop(ctx)
})

/* ----------------------------- full node mode ----------------------------- */
if !isProposer {
// Full-nodes can sync from DA but it is not necessary to wait for it, since it can sync from P2P as well in parallel.
Expand Down Expand Up @@ -200,7 +209,6 @@ func (m *Manager) Start(ctx context.Context) error {
// channel to signal sequencer rotation started
rotateSequencerC := make(chan string, 1)

eg, ctx := errgroup.WithContext(ctx)
uerrors.ErrGroupGoLog(eg, m.logger, func() error {
return m.SubmitLoop(ctx, bytesProducedC)
})
Expand Down
23 changes: 21 additions & 2 deletions block/pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,48 @@ import (

func (m *Manager) PruneBlocks(retainHeight uint64) error {
if m.IsProposer() && m.NextHeightToSubmit() < retainHeight { // do not delete anything that we might submit in future
return fmt.Errorf("cannot prune blocks before they have been submitted: retain height %d: next height to submit: %d: %w",
m.logger.Error("skipping block pruning. next height to submit is previous to retain_height.", "retain_height", retainHeight, "next_submit_height", m.NextHeightToSubmit())
return fmt.Errorf("skipping block pruning. next height to submit is previous to retain_height.t %d: next height to submit: %d: %w",
retainHeight,
m.NextHeightToSubmit(),
gerrc.ErrInvalidArgument)
}

//
err := m.P2PClient.RemoveBlocks(context.Background(), m.State.BaseHeight, retainHeight)
if err != nil {
m.logger.Error("pruning blocksync store", "retain_height", retainHeight, "err", err)
}

pruned, err := m.Store.PruneBlocks(m.State.BaseHeight, retainHeight)
if err != nil {
return fmt.Errorf("prune block store: %w", err)
m.logger.Error("pruning dymint store", "retain_height", retainHeight, "err", err)
return fmt.Errorf("pruning dymint store: %w", err)
}

// TODO: prune state/indexer and state/txindexer??

m.State.BaseHeight = retainHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
m.logger.Error("saving state.", "retain_height", retainHeight, "err", err)
return fmt.Errorf("save state: %w", err)
}

m.logger.Info("pruned blocks", "pruned", pruned, "retain_height", retainHeight)
return nil
}

func (m *Manager) PruningLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case retainHeight := <-m.pruningC:
err := m.PruneBlocks(uint64(retainHeight))
if err != nil {
m.logger.Error("pruning blocks", "retainHeight", retainHeight, "err", err)
}
}
}
}

0 comments on commit 829ef48

Please sign in to comment.