diff --git a/CHANGELOG.md b/CHANGELOG.md index 59784ad6a..90f9e39c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Changelog - [#965](https://github.com/cosmos/iavl/pull/965) Use expected interface for expected IAVL `Logger`. +- [#970](https://github.com/cosmos/iavl/pull/970) Close the pruning process when the nodeDB is closed. ## v1.2.0 May 13, 2024 diff --git a/nodedb.go b/nodedb.go index 3cf403c04..7810acbf2 100644 --- a/nodedb.go +++ b/nodedb.go @@ -2,6 +2,7 @@ package iavl import ( "bytes" + "context" "crypto/sha256" "errors" "fmt" @@ -71,9 +72,12 @@ var ( var errInvalidFastStorageVersion = fmt.Errorf("fast storage version must be in the format %s", fastStorageVersionDelimiter) type nodeDB struct { + ctx context.Context + cancel context.CancelFunc logger Logger mtx sync.Mutex // Read/write lock. + done chan struct{} // Channel to signal that the pruning process is done. db dbm.DB // Persistent node storage. batch dbm.Batch // Batched writing buffer. opts Options // Options to customize for pruning/writing @@ -96,7 +100,10 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB { storeVersion = []byte(defaultStorageVersionValue) } + ctx, cancel := context.WithCancel(context.Background()) ndb := &nodeDB{ + ctx: ctx, + cancel: cancel, logger: lg, db: db, batch: NewBatchWithFlusher(db, opts.FlushThreshold), @@ -113,6 +120,7 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB { } if opts.AsyncPruning { + ndb.done = make(chan struct{}) go ndb.startPruning() } @@ -578,26 +586,32 @@ func (ndb *nodeDB) DeleteVersionsFrom(fromVersion int64) error { // startPruning starts the pruning process. func (ndb *nodeDB) startPruning() { for { - ndb.mtx.Lock() - toVersion := ndb.pruneVersion - ndb.mtx.Unlock() + select { + case <-ndb.ctx.Done(): + ndb.done <- struct{}{} + return + default: + ndb.mtx.Lock() + toVersion := ndb.pruneVersion + ndb.mtx.Unlock() - if toVersion == 0 { - time.Sleep(100 * time.Millisecond) - continue - } + if toVersion == 0 { + time.Sleep(100 * time.Millisecond) + continue + } - if err := ndb.deleteVersionsTo(toVersion); err != nil { - ndb.logger.Error("Error while pruning", "err", err) - time.Sleep(1 * time.Second) - continue - } + if err := ndb.deleteVersionsTo(toVersion); err != nil { + ndb.logger.Error("Error while pruning", "err", err) + time.Sleep(1 * time.Second) + continue + } - ndb.mtx.Lock() - if ndb.pruneVersion <= toVersion { - ndb.pruneVersion = 0 + ndb.mtx.Lock() + if ndb.pruneVersion <= toVersion { + ndb.pruneVersion = 0 + } + ndb.mtx.Unlock() } - ndb.mtx.Unlock() } } @@ -1095,6 +1109,11 @@ func (ndb *nodeDB) Close() error { ndb.mtx.Lock() defer ndb.mtx.Unlock() + ndb.cancel() + if ndb.opts.AsyncPruning { + <-ndb.done // wait for the pruning process to finish + } + if ndb.batch != nil { if err := ndb.batch.Close(); err != nil { return err diff --git a/nodedb_test.go b/nodedb_test.go index be8314241..3e88c354f 100644 --- a/nodedb_test.go +++ b/nodedb_test.go @@ -436,3 +436,12 @@ func TestDeleteVersionsFromNoDeadlock(t *testing.T) { require.Error(t, err, "") require.Contains(t, err.Error(), fmt.Sprintf("unable to delete version %v with 2 active readers", targetVersion+2)) } + +func TestCloseNodeDB(t *testing.T) { + db := dbm.NewMemDB() + defer db.Close() + opts := DefaultOptions() + opts.AsyncPruning = true + ndb := newNodeDB(db, 0, opts, NewNopLogger()) + require.NoError(t, ndb.Close()) +}