Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: sync missing features from v1.2.x to the default branch #969

Merged
merged 4 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- [#952](https://github.com/cosmos/iavl/pull/952) Add `DeleteVersionsFrom(int64)` API.
- [#961](https://github.com/cosmos/iavl/pull/961) Add new `GetLatestVersion` API to get the latest version.
- [#965](https://github.com/cosmos/iavl/pull/965) Use expected interface for expected IAVL `Logger`.

## v1.2.0 May 13, 2024

Expand Down
13 changes: 0 additions & 13 deletions migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os/exec"
"path"
"testing"
"time"

"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -254,18 +253,6 @@ func TestPruning(t *testing.T) {
}
}

// Wait for pruning to finish
for i := 0; i < 100; i++ {
_, _, err := tree.SaveVersion()
require.NoError(t, err)
isLeacy, err := tree.ndb.hasLegacyVersion(int64(legacyVersion))
require.NoError(t, err)
if !isLeacy {
break
}
// Simulate the consensus state update
time.Sleep(500 * time.Millisecond)
}
// Reload the tree
tree = NewMutableTree(db, 0, false, NewNopLogger())
versions := tree.AvailableVersions()
Expand Down
16 changes: 12 additions & 4 deletions mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,17 @@ func (tree *MutableTree) GetVersioned(key []byte, version int64) ([]byte, error)
return nil, nil
}

// SetCommitting sets a flag to indicate that the tree is in the process of being saved.
// This is used to prevent parallel writing from async pruning.
func (tree *MutableTree) SetCommitting() {
tree.ndb.SetCommitting()
}

// UnsetCommitting unsets the flag to indicate that the tree is no longer in the process of being saved.
func (tree *MutableTree) UnsetCommitting() {
tree.ndb.UnsetCommitting()
}

// SaveVersion saves a new tree version to disk, based on the current state of
// the tree. Returns the hash and new version number.
func (tree *MutableTree) SaveVersion() ([]byte, int64, error) {
Expand Down Expand Up @@ -1025,10 +1036,7 @@ func (tree *MutableTree) saveNewNodes(version int64) error {
var recursiveAssignKey func(*Node) ([]byte, error)
recursiveAssignKey = func(node *Node) ([]byte, error) {
if node.nodeKey != nil {
if node.nodeKey.nonce != 0 {
return node.nodeKey.GetKey(), nil
}
return node.hash, nil
return node.GetKey(), nil
}
nonce++
node.nodeKey = &NodeKey{
Expand Down
160 changes: 124 additions & 36 deletions nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@ const (
defaultStorageVersionValue = "1.0.0"
fastStorageVersionValue = "1.1.0"
fastNodeCacheSize = 100000

// This is used to avoid the case which pruning blocks the main process.
deleteBatchCount = 1000
deletePauseDuration = 100 * time.Millisecond
)

var (
Expand Down Expand Up @@ -86,9 +82,12 @@ type nodeDB struct {
storageVersion string // Storage version
firstVersion int64 // First version of nodeDB.
latestVersion int64 // Latest version of nodeDB.
pruneVersion int64 // Version to prune up to.
legacyLatestVersion int64 // Latest version of nodeDB in legacy format.
nodeCache cache.Cache // Cache for nodes in the regular tree that consists of key-value pairs at any version.
fastNodeCache cache.Cache // Cache for nodes in the fast index that represents only key-value pairs at the latest version.
isCommitting bool // Flag to indicate that the nodeDB is committing.
chCommitting chan struct{} // Channel to signal that the committing is done.
}

func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB {
Expand All @@ -98,19 +97,27 @@ func newNodeDB(db dbm.DB, cacheSize int, opts Options, lg Logger) *nodeDB {
storeVersion = []byte(defaultStorageVersionValue)
}

return &nodeDB{
ndb := &nodeDB{
logger: lg,
db: db,
batch: NewBatchWithFlusher(db, opts.FlushThreshold),
opts: opts,
firstVersion: 0,
latestVersion: 0, // initially invalid
legacyLatestVersion: 0,
pruneVersion: 0,
nodeCache: cache.New(cacheSize),
fastNodeCache: cache.New(fastNodeCacheSize),
versionReaders: make(map[int64]uint32, 8),
storageVersion: string(storeVersion),
chCommitting: make(chan struct{}, 1),
}

if opts.AsyncPruning {
go ndb.startPruning()
}

return ndb
}

// GetNode gets a node from memory or disk. If it is an inner node, it does not
Expand Down Expand Up @@ -243,6 +250,33 @@ func (ndb *nodeDB) SaveFastNodeNoCache(node *fastnode.Node) error {
return ndb.saveFastNodeUnlocked(node, false)
}

// SetCommitting sets the committing flag to true.
// This is used to let the pruning process know that the nodeDB is committing.
func (ndb *nodeDB) SetCommitting() {
for len(ndb.chCommitting) > 0 {
<-ndb.chCommitting
}
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
ndb.isCommitting = true
}

// UnsetCommitting sets the committing flag to false.
// This is used to let the pruning process know that the nodeDB is done committing.
func (ndb *nodeDB) UnsetCommitting() {
ndb.mtx.Lock()
ndb.isCommitting = false
ndb.mtx.Unlock()
ndb.chCommitting <- struct{}{}
}

// IsCommitting returns true if the nodeDB is committing, false otherwise.
func (ndb *nodeDB) IsCommitting() bool {
ndb.mtx.Lock()
defer ndb.mtx.Unlock()
return ndb.isCommitting
}

// SetFastStorageVersionToBatch sets storage version to fast where the version is
// 1.1.0-<version of the current live state>. Returns error if storage version is incorrect or on
// db error, nil otherwise. Requires changes to be committed after to be persisted.
Expand Down Expand Up @@ -330,6 +364,37 @@ func (ndb *nodeDB) Has(nk []byte) (bool, error) {
return ndb.db.Has(ndb.nodeKey(nk))
}

// deleteFromPruning deletes the orphan nodes from the pruning process.
func (ndb *nodeDB) deleteFromPruning(key []byte) error {
if ndb.IsCommitting() {
// if the nodeDB is committing, the pruning process will be done after the committing.
<-ndb.chCommitting
}

ndb.mtx.Lock()
defer ndb.mtx.Unlock()
return ndb.batch.Delete(key)
}

// saveNodeFromPruning saves the orphan nodes to the pruning process.
func (ndb *nodeDB) saveNodeFromPruning(node *Node) error {
if ndb.IsCommitting() {
// if the nodeDB is committing, the pruning process will be done after the committing.
<-ndb.chCommitting
}

ndb.mtx.Lock()
defer ndb.mtx.Unlock()

// Save node bytes to db.
var buf bytes.Buffer
buf.Grow(node.encodedSize())
if err := node.writeBytes(&buf); err != nil {
return err
}
return ndb.batch.Set(ndb.nodeKey(node.GetKey()), buf.Bytes())
}

// deleteVersion deletes a tree version from disk.
// deletes orphans
func (ndb *nodeDB) deleteVersion(version int64) error {
Expand All @@ -342,7 +407,7 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
if orphan.nodeKey.nonce == 0 && !orphan.isLegacy {
// if the orphan is a reformatted root, it can be a legacy root
// so it should be removed from the pruning process.
if err := ndb.batch.Delete(ndb.legacyNodeKey(orphan.hash)); err != nil {
if err := ndb.deleteFromPruning(ndb.legacyNodeKey(orphan.hash)); err != nil {
return err
}
}
Expand All @@ -354,9 +419,9 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
}
nk := orphan.GetKey()
if orphan.isLegacy {
return ndb.batch.Delete(ndb.legacyNodeKey(nk))
return ndb.deleteFromPruning(ndb.legacyNodeKey(nk))
}
return ndb.batch.Delete(ndb.nodeKey(nk))
return ndb.deleteFromPruning(ndb.nodeKey(nk))
}); err != nil {
return err
}
Expand All @@ -365,7 +430,7 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
if rootKey == nil || !bytes.Equal(rootKey, literalRootKey) {
// if the root key is not matched with the literal root key, it means the given root
// is a reference root to the previous version.
if err := ndb.batch.Delete(ndb.nodeKey(literalRootKey)); err != nil {
if err := ndb.deleteFromPruning(ndb.nodeKey(literalRootKey)); err != nil {
return err
}
}
Expand All @@ -381,12 +446,12 @@ func (ndb *nodeDB) deleteVersion(version int64) error {
return err
}
// ensure that the given version is not included in the root search
if err := ndb.batch.Delete(ndb.nodeKey(literalRootKey)); err != nil {
if err := ndb.deleteFromPruning(ndb.nodeKey(literalRootKey)); err != nil {
return err
}
// instead, the root should be reformatted to (version, 0)
root.nodeKey.nonce = 0
if err := ndb.SaveNode(root); err != nil {
if err := ndb.saveNodeFromPruning(root); err != nil {
return err
}
}
Expand Down Expand Up @@ -420,36 +485,30 @@ func (ndb *nodeDB) deleteLegacyNodes(version int64, nk []byte) error {

// deleteLegacyVersions deletes all legacy versions from disk.
func (ndb *nodeDB) deleteLegacyVersions(legacyLatestVersion int64) error {
count := 0

checkDeletePause := func() {
count++
if count%deleteBatchCount == 0 {
time.Sleep(deletePauseDuration)
count = 0
}
// Delete the last version for the legacyLastVersion
if err := ndb.traverseOrphans(legacyLatestVersion, legacyLatestVersion+1, func(orphan *Node) error {
return ndb.deleteFromPruning(ndb.legacyNodeKey(orphan.hash))
}); err != nil {
return err
}

// Delete orphans for all legacy versions
if err := ndb.traversePrefix(legacyOrphanKeyFormat.Key(), func(key, value []byte) error {
checkDeletePause()
if err := ndb.batch.Delete(key); err != nil {
if err := ndb.deleteFromPruning(key); err != nil {
return err
}
var fromVersion, toVersion int64
legacyOrphanKeyFormat.Scan(key, &toVersion, &fromVersion)
if (fromVersion <= legacyLatestVersion && toVersion < legacyLatestVersion) || fromVersion > legacyLatestVersion {
checkDeletePause()
return ndb.batch.Delete(ndb.legacyNodeKey(value))
return ndb.deleteFromPruning(ndb.legacyNodeKey(value))
}
return nil
}); err != nil {
return err
}
// Delete all legacy roots
if err := ndb.traversePrefix(legacyRootKeyFormat.Key(), func(key, _ []byte) error {
checkDeletePause()
return ndb.batch.Delete(key)
return ndb.deleteFromPruning(key)
}); err != nil {
return err
}
Expand Down Expand Up @@ -515,8 +574,45 @@ func (ndb *nodeDB) DeleteVersionsFrom(fromVersion int64) error {
return nil
}

// startPruning starts the pruning process.
func (ndb *nodeDB) startPruning() {
for {
ndb.mtx.Lock()
toVersion := ndb.pruneVersion
ndb.mtx.Unlock()

if toVersion == 0 {
time.Sleep(100 * time.Millisecond)
continue
}

if err := ndb.deleteVersionsTo(toVersion); err != nil {
ndb.logger.Error("Error while pruning", "err", err)
time.Sleep(1 * time.Second)
continue
}

ndb.mtx.Lock()
if ndb.pruneVersion <= toVersion {
ndb.pruneVersion = 0
}
ndb.mtx.Unlock()
}
}

// DeleteVersionsTo deletes the oldest versions up to the given version from disk.
func (ndb *nodeDB) DeleteVersionsTo(toVersion int64) error {
if !ndb.opts.AsyncPruning {
return ndb.deleteVersionsTo(toVersion)
}

ndb.mtx.Lock()
defer ndb.mtx.Unlock()
ndb.pruneVersion = toVersion
return nil
}

func (ndb *nodeDB) deleteVersionsTo(toVersion int64) error {
legacyLatestVersion, err := ndb.getLegacyLatestVersion()
if err != nil {
return err
Expand Down Expand Up @@ -553,20 +649,12 @@ func (ndb *nodeDB) DeleteVersionsTo(toVersion int64) error {

// Delete the legacy versions
if legacyLatestVersion >= first {
// Delete the last version for the legacyLastVersion
if err := ndb.traverseOrphans(legacyLatestVersion, legacyLatestVersion+1, func(orphan *Node) error {
return ndb.batch.Delete(ndb.legacyNodeKey(orphan.hash))
}); err != nil {
return err
if err := ndb.deleteLegacyVersions(legacyLatestVersion); err != nil {
ndb.logger.Error("Error deleting legacy versions", "err", err)
}
first = legacyLatestVersion + 1
// reset the legacy latest version forcibly to avoid multiple calls
ndb.resetLegacyLatestVersion(-1)
go func() {
if err := ndb.deleteLegacyVersions(legacyLatestVersion); err != nil {
ndb.logger.Error("Error deleting legacy versions", "err", err)
}
}()
first = legacyLatestVersion + 1
}

for version := first; version <= toVersion; version++ {
Expand Down
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ type Options struct {

// Ethereum has found that commit of 100KB is optimal, ref ethereum/go-ethereum#15115
FlushThreshold int

// AsyncPruning is a flag to enable async pruning
AsyncPruning bool
}

// DefaultOptions returns the default options for IAVL.
Expand Down Expand Up @@ -118,3 +121,10 @@ func FlushThresholdOption(ft int) Option {
opts.FlushThreshold = ft
}
}

// AsyncPruningOption sets the AsyncPruning for the tree.
func AsyncPruningOption(asyncPruning bool) Option {
return func(opts *Options) {
opts.AsyncPruning = asyncPruning
}
}
Loading