Skip to content

Commit

Permalink
fix: close the pruning process properly (cosmos#970)
Browse files Browse the repository at this point in the history
  • Loading branch information
cool-develope authored Jul 26, 2024
1 parent d561baf commit 7939ef9
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [#952](https://github.com/cosmos/iavl/pull/952) Add `DeleteVersionsFrom(int64)` API.
- [#961](https://github.com/cosmos/iavl/pull/961) Add new `GetLatestVersion` API to get the latest version.
- [#965](https://github.com/cosmos/iavl/pull/965) Use expected interface for expected IAVL `Logger`.
- [#970](https://github.com/cosmos/iavl/pull/970) Close the pruning process when the nodeDB is closed.

## v1.2.0 May 13, 2024

Expand Down
51 changes: 35 additions & 16 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package iavl

import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
Expand Down Expand Up @@ -72,9 +73,12 @@ var (
var errInvalidFastStorageVersion = fmt.Errorf("fast storage version must be in the format <storage version>%s<latest fast cache version>", fastStorageVersionDelimiter)

type nodeDB struct {
ctx context.Context
cancel context.CancelFunc
logger Logger

mtx sync.Mutex // Read/write lock.
done chan struct{} // Channel to signal that the pruning process is done.
db dbm.DB // Persistent node storage.
batch corestore.Batch // Batched writing buffer.
opts Options // Options to customize for pruning/writing
Expand All @@ -97,7 +101,10 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB {
storeVersion = []byte(defaultStorageVersionValue)
}

ctx, cancel := context.WithCancel(context.Background())
ndb := &nodeDB{
ctx: ctx,
cancel: cancel,
logger: lg,
db: db,
batch: NewBatchWithFlusher(db, opts.FlushThreshold),
Expand All @@ -114,6 +121,7 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB {
}

if opts.AsyncPruning {
ndb.done = make(chan struct{})
go ndb.startPruning()
}

Expand Down Expand Up @@ -577,26 +585,32 @@ func (ndb *nodeDB) DeleteVersionsFrom(fromVersion int64) error {
// startPruning starts the pruning process.
func (ndb *nodeDB) startPruning() {
for {
ndb.mtx.Lock()
toVersion := ndb.pruneVersion
ndb.mtx.Unlock()
select {
case <-ndb.ctx.Done():
ndb.done <- struct{}{}
return
default:
ndb.mtx.Lock()
toVersion := ndb.pruneVersion
ndb.mtx.Unlock()

if toVersion == 0 {
time.Sleep(100 * time.Millisecond)
continue
}
if toVersion == 0 {
time.Sleep(100 * time.Millisecond)
continue
}

if err := ndb.deleteVersionsTo(toVersion); err != nil {
ndb.logger.Error("Error while pruning", "err", err)
time.Sleep(1 * time.Second)
continue
}
if err := ndb.deleteVersionsTo(toVersion); err != nil {
ndb.logger.Error("Error while pruning", "err", err)
time.Sleep(1 * time.Second)
continue
}

ndb.mtx.Lock()
if ndb.pruneVersion <= toVersion {
ndb.pruneVersion = 0
ndb.mtx.Lock()
if ndb.pruneVersion <= toVersion {
ndb.pruneVersion = 0
}
ndb.mtx.Unlock()
}
ndb.mtx.Unlock()
}
}

Expand Down Expand Up @@ -1095,6 +1109,11 @@ func (ndb *nodeDB) Close() error {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()

ndb.cancel()
if ndb.opts.AsyncPruning {
<-ndb.done // wait for the pruning process to finish
}

if ndb.batch != nil {
if err := ndb.batch.Close(); err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions nodedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,3 +436,12 @@ func TestDeleteVersionsFromNoDeadlock(t *testing.T) {
require.Error(t, err, "")
require.Contains(t, err.Error(), fmt.Sprintf("unable to delete version %v with 2 active readers", targetVersion+2))
}

func TestCloseNodeDB(t *testing.T) {
db := dbm.NewMemDB()
defer db.Close()
opts := DefaultOptions()
opts.AsyncPruning = true
ndb := newNodeDB(db, 0, opts, NewNopLogger())
require.NoError(t, ndb.Close())
}

0 comments on commit 7939ef9

Please sign in to comment.