Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): pruning command #1063

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,11 @@ func (m *Manager) syncFromSettlement() error {
// The SL hasn't got any batches for this chain yet.
m.logger.Info("No batches for chain found in SL.")
m.LastSubmittedHeight.Store(uint64(m.Genesis.InitialHeight - 1))
m.State.LastSubmittedHeight = uint64(m.Genesis.InitialHeight - 1)
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return fmt.Errorf("save state: %w", err)
}
return nil
}

Expand All @@ -264,6 +269,11 @@ func (m *Manager) syncFromSettlement() error {
return err
}
m.LastSubmittedHeight.Store(res.EndHeight)
m.State.LastSubmittedHeight = res.EndHeight
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return fmt.Errorf("save state: %w", err)
}
err = m.syncToTargetHeight(res.EndHeight)
m.UpdateTargetHeight(res.EndHeight)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions block/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func NewStateFromGenesis(genDoc *tmtypes.GenesisDoc) (*types.State, error) {
LastHeightConsensusParamsChanged: genDoc.InitialHeight,
}
s.SetHeight(0)
s.LastSubmittedHeight = 0
copy(s.AppHash[:], genDoc.AppHash)

err = s.SetConsensusParamsFromGenesis(genDoc.AppState)
Expand Down
5 changes: 5 additions & 0 deletions block/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ func (m *Manager) SubmitBatch(batch *types.Batch) error {

types.RollappHubHeightGauge.Set(float64(batch.EndHeight()))
m.LastSubmittedHeight.Store(batch.EndHeight())
m.State.LastSubmittedHeight = batch.EndHeight()
_, err = m.Store.SaveState(m.State, nil)
if err != nil {
return fmt.Errorf("save state: %w", err)
}
return nil
}

Expand Down
3 changes: 3 additions & 0 deletions indexers/blockindexer/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ type BlockIndexer interface {
// Search performs a query for block heights that match a given BeginBlock
// and Endblock event search criteria.
Search(ctx context.Context, q *query.Query) ([]int64, error)

// Prune function used by indexer pruning command
Prune(to int64) (uint64, error)
}
68 changes: 68 additions & 0 deletions indexers/blockindexer/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,71 @@ func (idx *BlockerIndexer) indexEvents(batch store.KVBatch, events []abci.Event,

return nil
}

func (idx *BlockerIndexer) Prune(to int64) (uint64, error) {
eventsPruned := uint64(0)

batch := idx.store.NewBatch()

flush := func(batch store.KVBatch, height int64) error {
err := batch.Commit()
if err != nil {
return fmt.Errorf("flush batch to disk: height %d: %w", height, err)
}
return nil
}

for h := int64(1); h < to; h++ {
key, err := heightKey(h)
if err != nil {
return eventsPruned, fmt.Errorf("create block height index key: %w", err)
}
_, err = idx.store.Get(key)
if err != nil {
continue
}
if err := idx.store.Delete(key); err != nil {
continue
}
eventsPruned++
if eventsPruned%1000 == 0 && eventsPruned > 0 {
err := flush(batch, h)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()
}
}

it := idx.store.PrefixIterator([]byte{})
defer it.Discard()

for ; it.Valid(); it.Next() {
key := it.Key()[13:]
height, err := parseHeightFromEventKey(key)
if err != nil || height >= to {
continue
}
err = batch.Delete(key)
if err != nil {
continue
}

eventsPruned++
if eventsPruned%1000 == 0 && eventsPruned > 0 {
err := flush(batch, to)
if err != nil {
return 0, err
}
batch.Discard()
batch = idx.store.NewBatch()
}
}
err := flush(batch, to)
if err != nil {
return 0, err
}

return eventsPruned, nil
}
18 changes: 18 additions & 0 deletions indexers/blockindexer/kv/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,24 @@ func parseValueFromEventKey(key []byte) (string, error) {
return eventValue, nil
}

func parseHeightFromEventKey(key []byte) (int64, error) {
var (
compositeKey, typ, eventValue string
height int64
)

remaining, err := orderedcode.Parse(string(key), &compositeKey, &eventValue, &height, &typ)
if err != nil {
return -1, fmt.Errorf("parse event key: %w", err)
}

if len(remaining) != 0 {
return -1, fmt.Errorf("unexpected remainder in key: %s", remaining)
}

return height, nil
}

func lookForHeight(conditions []query.Condition) (int64, bool) {
for _, c := range conditions {
if c.CompositeKey == types.BlockHeightKey && c.Op == query.OpEqual {
Expand Down
4 changes: 4 additions & 0 deletions indexers/blockindexer/null/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ func (idx *BlockerIndexer) Index(types.EventDataNewBlockHeader) error {
func (idx *BlockerIndexer) Search(ctx context.Context, q *query.Query) ([]int64, error) {
return []int64{}, nil
}

func (idx *BlockerIndexer) Prune(to int64) (uint64, error) {
return 0, nil
}
3 changes: 3 additions & 0 deletions indexers/txindex/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type TxIndexer interface {

// Search allows you to query for transactions.
Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error)

// Prune function used by indexer pruning command
Prune(to int64) (uint64, error)
}

// Batch groups together multiple Index operations to be performed at the same time.
Expand Down
48 changes: 48 additions & 0 deletions indexers/txindex/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,45 @@ LOOP:
return filteredHashes
}

func (txi *TxIndex) Prune(to int64) (uint64, error) {
eventsPruned := uint64(0)

for h := int64(1); h < to; h++ {
prefix := []byte(fmt.Sprintf("%s/%d/", tmtypes.TxHeightKey, h))

it := txi.store.PrefixIterator(prefix)
defer it.Discard()
for ; it.Valid(); it.Next() {
err := txi.store.Delete(it.Key())
if err != nil {
continue
}
err = txi.store.Delete(it.Value())
if err != nil {
continue
}
}
}
it := txi.store.PrefixIterator([]byte{})
defer it.Discard()

for ; it.Valid(); it.Next() {
key := it.Key()[13:]
height, err := extractHeightFromKey(key)
if err != nil || height >= to {
continue
}
err = txi.store.Delete(key)
if err != nil {
continue
}

eventsPruned++
}

return eventsPruned, nil
}

// Keys

func isTagKey(key []byte) bool {
Expand All @@ -564,6 +603,15 @@ func extractValueFromKey(key []byte) string {
return parts[1]
}

func extractHeightFromKey(key []byte) (int64, error) {
parts := strings.SplitN(string(key), tagKeySeparator, 3)
height, err := strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return 0, err
}
return height, nil
}

func keyForEvent(key string, value []byte, result *abci.TxResult) []byte {
return []byte(fmt.Sprintf("%s/%s/%d/%d",
key,
Expand Down
4 changes: 4 additions & 0 deletions indexers/txindex/null/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ func (txi *TxIndex) Index(result *abci.TxResult) error {
func (txi *TxIndex) Search(ctx context.Context, q *query.Query) ([]*abci.TxResult, error) {
return []*abci.TxResult{}, nil
}

func (txi *TxIndex) Prune(to int64) (uint64, error) {
return 0, nil
}
2 changes: 1 addition & 1 deletion proto/types/dymint/state.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ message State {

SequencerSet sequencerSet = 18 [(gogoproto.nullable) = false];
RollappConsensusParams consensus_params = 19 [(gogoproto.nullable) = false];

uint64 last_submitted_height = 20;
}

//rollapp params defined in genesis and updated via gov proposal
Expand Down
Loading
Loading