From c1d1ef05acf80b0393104af6b9a7e143b32eba82 Mon Sep 17 00:00:00 2001 From: Zhong Qiu <36867992+zhongqiuwood@users.noreply.github.com> Date: Thu, 5 May 2022 05:37:57 +0800 Subject: [PATCH] Merge PR: refactor orphan map (#1969) * refactor orphan map (#1960) * refactor orphan map * func (oi *OrphanInfo) enqueueResult(res int64) * rename * rename * rm oi.orphanItemCacheQueue * rm oi.orphanItemCacheQueue * type NodeCache struct * simplify names * simplify names --- dev/start.sh | 5 +- libs/iavl/mutable_tree.go | 5 +- libs/iavl/mutable_tree_oec.go | 56 ++++++------- libs/iavl/mutable_tree_oec_test.go | 11 ++- libs/iavl/nodedb.go | 30 ++----- libs/iavl/nodedb_cache.go | 48 +++++------ libs/iavl/nodedb_oec.go | 117 ++++++++++----------------- libs/iavl/nodedb_oec_test.go | 13 +-- libs/iavl/nodedb_orphan.go | 63 +++++++++++++++ libs/iavl/nodedb_orphan_info.go | 124 +++++++++++++++++++++++++++++ 10 files changed, 303 insertions(+), 169 deletions(-) create mode 100644 libs/iavl/nodedb_orphan.go create mode 100644 libs/iavl/nodedb_orphan_info.go diff --git a/dev/start.sh b/dev/start.sh index 5549a229ac..3a85228f4b 100755 --- a/dev/start.sh +++ b/dev/start.sh @@ -23,7 +23,8 @@ killbyname() { run() { LOG_LEVEL=main:debug,iavl:info,*:error,state:info,provider:info - exchaind start --pruning=nothing --rpc.unsafe \ +# exchaind start --pruning=nothing --rpc.unsafe \ + exchaind start --rpc.unsafe \ --local-rpc-port 26657 \ --log_level $LOG_LEVEL \ --log_file json \ @@ -33,7 +34,7 @@ run() { --iavl-enable-async-commit \ --enable-gid \ --append-pid=true \ - --iavl-commit-interval-height 10 \ + --iavl-commit-interval-height 5 \ --iavl-output-modules evm=0,acc=0 \ --trace --home $HOME_SERVER --chain-id $CHAINID \ --elapsed Round=1,CommitRound=1,Produce=1 \ diff --git a/libs/iavl/mutable_tree.go b/libs/iavl/mutable_tree.go index 42bb11f001..a327b5087b 100644 --- a/libs/iavl/mutable_tree.go +++ b/libs/iavl/mutable_tree.go @@ -121,9 +121,8 @@ func (tree *MutableTree) IsEmpty() bool { // VersionExists returns whether or not a version exists. func (tree *MutableTree) VersionExists(version int64) bool { - tree.ndb.mtx.Lock() - defer tree.ndb.mtx.Unlock() - if tree.ndb.heightOrphansMap[version] != nil { + _, ok := tree.ndb.findRootHash(version) + if ok { return true } return tree.versions.Get(version) diff --git a/libs/iavl/mutable_tree_oec.go b/libs/iavl/mutable_tree_oec.go index 9f4c22cffa..5dbaf0bbaa 100644 --- a/libs/iavl/mutable_tree_oec.go +++ b/libs/iavl/mutable_tree_oec.go @@ -41,9 +41,12 @@ type commitEvent struct { iavlHeight int } + func (tree *MutableTree) SaveVersionAsync(version int64, useDeltas bool) ([]byte, int64, error) { - moduleName := tree.GetModuleName() - oldRoot, saved := tree.hasSaved(version) + + tree.ndb.sanityCheckHandleOrphansResult(version) + + oldRoot, saved := tree.ndb.findRootHash(version) if saved { return nil, version, fmt.Errorf("existing version: %d, root: %X", version, oldRoot) } @@ -65,18 +68,19 @@ func (tree *MutableTree) SaveVersionAsync(version int64, useDeltas bool) ([]byte } } - tree.ndb.SaveOrphansAsync(version, tree.orphans) - - shouldPersist := (version-tree.lastPersistHeight >= CommitIntervalHeight) || - (treeMap.totalPreCommitCacheSize >= MinCommitItemCount) + shouldPersist := (version-tree.lastPersistHeight >= CommitIntervalHeight) || (treeMap.totalPreCommitCacheSize >= MinCommitItemCount) + newOrphans := tree.orphans if shouldPersist { - batch := tree.NewBatch() - if err := tree.persist(batch, version); err != nil { - return nil, 0, err - } + tree.ndb.saveNewOrphans(version, newOrphans, true) + tree.persist(version) + newOrphans = nil } + return tree.setNewWorkingTree(version, newOrphans, shouldPersist) +} + +func (tree *MutableTree) setNewWorkingTree(version int64, newOrphans []*Node, persisted bool) ([]byte, int64, error) { // set new working tree tree.ImmutableTree = tree.ImmutableTree.clone() tree.lastSaved = tree.ImmutableTree.clone() @@ -84,15 +88,14 @@ func (tree *MutableTree) SaveVersionAsync(version int64, useDeltas bool) ([]byte for k := range tree.savedNodes { delete(tree.savedNodes, k) } - rootHash := tree.lastSaved.Hash() - tree.setHeightOrphansItem(version, rootHash) + tree.ndb.enqueueOrphanTask(version, rootHash, newOrphans) tree.version = version - if shouldPersist { + if persisted { tree.versions.Set(version, true) } - treeMap.updateMutableTreeMap(moduleName) + treeMap.updateMutableTreeMap(tree.GetModuleName()) tree.removedVersions.Range(func(k, v interface{}) bool { tree.log(IavlDebug, "remove version from tree version map", "Height", k.(int64)) @@ -109,7 +112,9 @@ func (tree *MutableTree) removeVersion(version int64) { tree.versions.Delete(version) } -func (tree *MutableTree) persist(batch dbm.Batch, version int64) error { +func (tree *MutableTree) persist(version int64) { + var err error + batch := tree.NewBatch() tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0} var tpp map[string]*Node = nil if EnablePruningHistoryState { @@ -117,15 +122,17 @@ func (tree *MutableTree) persist(batch dbm.Batch, version int64) error { } if tree.root == nil { // There can still be orphans, for example if the root is the node being removed. - if err := tree.ndb.SaveEmptyRoot(batch, version); err != nil { - return err - } + err = tree.ndb.SaveEmptyRoot(batch, version) } else { - if err := tree.ndb.SaveRoot(batch, tree.root, version); err != nil { - return err - } + err = tree.ndb.SaveRoot(batch, tree.root, version) tpp = tree.ndb.asyncPersistTppStart(version) } + + if err != nil { + // never going to happen in case of AC enabled + panic(err) + } + for k := range tree.commitOrphans { delete(tree.commitOrphans, k) } @@ -133,7 +140,6 @@ func (tree *MutableTree) persist(batch dbm.Batch, version int64) error { tree.commitCh <- commitEvent{version, versions, batch, tpp, nil, int(tree.Height())} tree.lastPersistHeight = version - return nil } func (tree *MutableTree) commitSchedule() { @@ -217,9 +223,6 @@ func (tree *MutableTree) log(level int, msg string, kvs ...interface{}) { iavlLog(tree.GetModuleName(), level, msg, kvs...) } -func (tree *MutableTree) setHeightOrphansItem(version int64, rootHash []byte) { - tree.ndb.setHeightOrphansItem(version, rootHash) -} func (tree *MutableTree) updateCommittedStateHeightPool(batch dbm.Batch, version int64, versions map[int64]bool) { queue := tree.committedHeightQueue @@ -291,9 +294,6 @@ func (tree *MutableTree) addOrphansOptimized(orphans []*Node) { } } -func (tree *MutableTree) hasSaved(version int64) ([]byte, bool) { - return tree.ndb.inVersionCacheMap(version) -} func (tree *MutableTree) deepCopyVersions() map[int64]bool { if !EnablePruningHistoryState { diff --git a/libs/iavl/mutable_tree_oec_test.go b/libs/iavl/mutable_tree_oec_test.go index 251c9a111d..322c16dc8a 100644 --- a/libs/iavl/mutable_tree_oec_test.go +++ b/libs/iavl/mutable_tree_oec_test.go @@ -110,8 +110,11 @@ func TestSaveVersionCommitIntervalHeight(t *testing.T) { tree.Set([]byte(k2), []byte("k22")) _, _, _, err = tree.SaveVersion(false) - require.Equal(t, 5, len(tree.ndb.prePersistNodeCache)+len(tree.ndb.nodeCache)) - require.Equal(t, 3, len(tree.ndb.orphanNodeCache)) + tree.ndb.sanityCheckHandleOrphansResult(tree.version+1) + tree.ndb.oi.enqueueResult(tree.version) + + require.Equal(t, 5, len(tree.ndb.prePersistNodeCache)+tree.ndb.nc.nodeCacheLen()) + require.Equal(t, 3, tree.ndb.oi.orphanNodeCacheLen()) _, _, _, err = tree.SaveVersion(false) require.NoError(t, err) @@ -124,7 +127,7 @@ func TestSaveVersionCommitIntervalHeight(t *testing.T) { _, _, _, err = tree.SaveVersion(false) require.NoError(t, err) require.Equal(t, 0, len(tree.ndb.prePersistNodeCache)) - require.Equal(t, 0, len(tree.ndb.orphanNodeCache)) + require.Equal(t, 0, tree.ndb.oi.orphanNodeCacheLen()) //require.Equal(t, 5, len(tree.ndb.nodeCache)+len(tree.ndb.tempPrePersistNodeCache)) tree.Set([]byte("k5"), []byte("5555555555")) @@ -467,7 +470,7 @@ func TestStopTree(t *testing.T) { _, _, _, err := tree.SaveVersion(false) require.NoError(t, err) tree.StopTree() - require.Equal(t, 5, len(tree.ndb.nodeCache)) + require.Equal(t, 5, tree.ndb.nc.nodeCacheLen()) } func TestLog(t *testing.T) { diff --git a/libs/iavl/nodedb.go b/libs/iavl/nodedb.go index b8cce036a0..0a3e627f60 100644 --- a/libs/iavl/nodedb.go +++ b/libs/iavl/nodedb.go @@ -47,18 +47,6 @@ type nodeDB struct { latestVersion int64 - //lruNodeCache *lru.Cache - - nodeCache map[string]*list.Element // Node cache. - nodeCacheSize int // Node cache size limit in elements. - nodeCacheQueue *syncList // LRU queue of cache elements. Used for deletion. - nodeCacheMutex sync.RWMutex // Mutex for node cache. - - orphanNodeCache map[string]*Node - heightOrphansCacheQueue *list.List - heightOrphansCacheSize int - heightOrphansMap map[int64]*heightOrphansItem - prePersistNodeCache map[string]*Node tppMap map[int64]*tppItem tppVersionList *list.List @@ -76,6 +64,9 @@ type nodeDB struct { name string preWriteNodeCache cmap.ConcurrentMap + + oi *OrphanInfo + nc *NodeCache } func makeNodeCacheMap(cacheSize int, initRatio float64) map[string]*list.Element { @@ -97,25 +88,16 @@ func newNodeDB(db dbm.DB, cacheSize int, opts *Options) *nodeDB { ndb := &nodeDB{ db: db, opts: *opts, - latestVersion: 0, // initially invalid - nodeCache: makeNodeCacheMap(cacheSize, IavlCacheInitRatio), - nodeCacheSize: cacheSize, - nodeCacheQueue: newSyncList(), versionReaders: make(map[int64]uint32, 8), - orphanNodeCache: make(map[string]*Node), - heightOrphansCacheQueue: list.New(), - heightOrphansCacheSize: HeightOrphansCacheSize, - heightOrphansMap: make(map[int64]*heightOrphansItem), prePersistNodeCache: make(map[string]*Node), tppMap: make(map[int64]*tppItem), tppVersionList: list.New(), - dbReadCount: 0, - dbReadTime: 0, - dbWriteCount: 0, name: ParseDBName(db), preWriteNodeCache: cmap.New(), } + ndb.oi = newOrphanInfo(ndb) + ndb.nc = newNodeCache(cacheSize) return ndb } @@ -138,7 +120,7 @@ func (ndb *nodeDB) getNodeFromMemory(hash []byte, promoteRecentNode bool) *Node return elem } - if elem, ok := ndb.orphanNodeCache[string(hash)]; ok { + if elem := ndb.oi.getNodeFromOrphanCache(hash); elem != nil { return elem } diff --git a/libs/iavl/nodedb_cache.go b/libs/iavl/nodedb_cache.go index 392c3a0850..7ecf66e550 100644 --- a/libs/iavl/nodedb_cache.go +++ b/libs/iavl/nodedb_cache.go @@ -1,41 +1,32 @@ package iavl import ( - cmap "github.com/orcaman/concurrent-map" - "github.com/tendermint/go-amino" + "container/list" "github.com/okex/exchain/libs/iavl/config" - + "github.com/tendermint/go-amino" + "sync" ) -func (ndb *nodeDB) uncacheNodeRontine(n []*Node) { - for _, node := range n { - ndb.uncacheNode(node.hash) - } +type NodeCache struct { + nodeCache map[string]*list.Element // Node cache. + nodeCacheSize int // Node cache size limit in elements. + nodeCacheQueue *syncList // LRU queue of cache elements. Used for deletion. + nodeCacheMutex sync.RWMutex // Mutex for node cache. } -func (ndb *nodeDB) initPreWriteCache() { - if ndb.preWriteNodeCache == nil { - ndb.preWriteNodeCache = cmap.New() +func newNodeCache(cacheSize int) *NodeCache { + return &NodeCache{ + nodeCache: makeNodeCacheMap(cacheSize, IavlCacheInitRatio), + nodeCacheSize: cacheSize, + nodeCacheQueue: newSyncList(), } } -func (ndb *nodeDB) cacheNodeToPreWriteCache(n *Node) { - ndb.preWriteNodeCache.Set(string(n.hash), n) -} - -func (ndb *nodeDB) finishPreWriteCache() { - ndb.preWriteNodeCache.IterCb(func(key string, v interface{}) { - ndb.cacheNode(v.(*Node)) - }) - ndb.preWriteNodeCache = nil -} - - // =================================================== // ======= map[string]*list.Element implementation // =================================================== -func (ndb *nodeDB) uncacheNode(hash []byte) { +func (ndb *NodeCache) uncache(hash []byte) { ndb.nodeCacheMutex.Lock() if elem, ok := ndb.nodeCache[string(hash)]; ok { ndb.nodeCacheQueue.Remove(elem) @@ -46,7 +37,7 @@ func (ndb *nodeDB) uncacheNode(hash []byte) { // Add a node to the cache and pop the least recently used node if we've // reached the cache size limit. -func (ndb *nodeDB) cacheNode(node *Node) { +func (ndb *NodeCache) cache(node *Node) { ndb.nodeCacheMutex.Lock() elem := ndb.nodeCacheQueue.PushBack(node) ndb.nodeCache[string(node.hash)] = elem @@ -59,17 +50,16 @@ func (ndb *nodeDB) cacheNode(node *Node) { ndb.nodeCacheMutex.Unlock() } -func (ndb *nodeDB) cacheNodeByCheck(node *Node) { +func (ndb *NodeCache) cacheByCheck(node *Node) { ndb.nodeCacheMutex.RLock() _, ok := ndb.nodeCache[string(node.hash)] ndb.nodeCacheMutex.RUnlock() if !ok { - ndb.cacheNode(node) + ndb.cache(node) } } - -func (ndb *nodeDB) getNodeFromCache(hash []byte, promoteRecentNode bool) (n *Node) { +func (ndb *NodeCache) get(hash []byte, promoteRecentNode bool) (n *Node) { // Check the cache. ndb.nodeCacheMutex.RLock() elem, ok := ndb.nodeCache[string(hash)] @@ -84,7 +74,7 @@ func (ndb *nodeDB) getNodeFromCache(hash []byte, promoteRecentNode bool) (n *Nod return } -func (ndb *nodeDB) nodeCacheLen() int { +func (ndb *NodeCache) nodeCacheLen() int { return len(ndb.nodeCache) } diff --git a/libs/iavl/nodedb_oec.go b/libs/iavl/nodedb_oec.go index b8d7e7fb17..2c25589d82 100644 --- a/libs/iavl/nodedb_oec.go +++ b/libs/iavl/nodedb_oec.go @@ -5,6 +5,7 @@ import ( "container/list" "encoding/binary" "fmt" + cmap "github.com/orcaman/concurrent-map" "github.com/tendermint/go-amino" @@ -28,12 +29,6 @@ var ( IavlCacheInitRatio float64 = 0 ) -type heightOrphansItem struct { - version int64 - rootHash []byte - orphans []*Node -} - type tppItem struct { nodeMap map[string]*Node listItem *list.Element @@ -50,50 +45,6 @@ func (ndb *nodeDB) SaveOrphans(batch dbm.Batch, version int64, orphans []*Node) } } -func (ndb *nodeDB) SaveOrphansAsync(version int64, orphans []*Node) { - ndb.log(IavlDebug, "saving orphan node to OrphanCache", "size", len(orphans)) - version-- - atomic.AddInt64(&ndb.totalOrphanCount, int64(len(orphans))) - - ndb.mtx.Lock() - defer ndb.mtx.Unlock() - - orphansObj := ndb.heightOrphansMap[version] - if orphansObj != nil { - orphansObj.orphans = orphans - } - for _, node := range orphans { - ndb.orphanNodeCache[string(node.hash)] = node - delete(ndb.prePersistNodeCache, amino.BytesToStr(node.hash)) - node.leftNode = nil - node.rightNode = nil - } - go ndb.uncacheNodeRontine(orphans) -} - -func (ndb *nodeDB) setHeightOrphansItem(version int64, rootHash []byte) { - if rootHash == nil { - rootHash = []byte{} - } - orphanObj := &heightOrphansItem{ - version: version, - rootHash: rootHash, - } - ndb.mtx.Lock() - defer ndb.mtx.Unlock() - ndb.heightOrphansCacheQueue.PushBack(orphanObj) - ndb.heightOrphansMap[version] = orphanObj - - for ndb.heightOrphansCacheQueue.Len() > ndb.heightOrphansCacheSize { - orphans := ndb.heightOrphansCacheQueue.Front() - oldHeightOrphanItem := ndb.heightOrphansCacheQueue.Remove(orphans).(*heightOrphansItem) - for _, node := range oldHeightOrphanItem.orphans { - delete(ndb.orphanNodeCache, amino.BytesToStr(node.hash)) - } - delete(ndb.heightOrphansMap, oldHeightOrphanItem.version) - } -} - func (ndb *nodeDB) dbGet(k []byte) ([]byte, error) { ts := time.Now() defer func() { @@ -307,8 +258,8 @@ func (ndb *nodeDB) sprintCacheLog(version int64) string { printLog := fmt.Sprintf("Save Version<%d>: Tree<%s>", version, ndb.name) printLog += fmt.Sprintf(", TotalPreCommitCacheSize:%d", treeMap.totalPreCommitCacheSize) - printLog += fmt.Sprintf(", nodeCCnt:%d", ndb.nodeCacheLen()) - printLog += fmt.Sprintf(", orphanCCnt:%d", len(ndb.orphanNodeCache)) + printLog += fmt.Sprintf(", nodeCCnt:%d", ndb.nc.nodeCacheLen()) + printLog += fmt.Sprintf(", orphanCCnt:%d", ndb.oi.orphanNodeCacheLen()) printLog += fmt.Sprintf(", prePerCCnt:%d", len(ndb.prePersistNodeCache)) printLog += fmt.Sprintf(", dbRCnt:%d", ndb.getDBReadCount()) printLog += fmt.Sprintf(", dbWCnt:%d", ndb.getDBWriteCount()) @@ -474,15 +425,6 @@ func updateBranchAndSaveNodeToChan(node *Node, saveNodesCh chan<- *Node) []byte return node.hash } -func (ndb *nodeDB) getRootWithCache(version int64) ([]byte, error) { - ndb.mtx.Lock() - defer ndb.mtx.Unlock() - orphansObj := ndb.heightOrphansMap[version] - if orphansObj != nil { - return orphansObj.rootHash, nil - } - return nil, fmt.Errorf("version %d is not in heightOrphansMap", version) -} // Saves orphaned nodes to disk under a special prefix. // version: the new version being saved. @@ -510,23 +452,14 @@ func (ndb *nodeDB) getNodeInTpp(hash []byte) (*Node, bool) { func (ndb *nodeDB) getRootWithCacheAndDB(version int64) ([]byte, error) { if EnableAsyncCommit { - root, err := ndb.getRootWithCache(version) - if err == nil { - return root, err + root, ok := ndb.findRootHash(version) + if ok { + return root, nil } } return ndb.getRoot(version) } -func (ndb *nodeDB) inVersionCacheMap(version int64) ([]byte, bool) { - ndb.mtx.Lock() - defer ndb.mtx.Unlock() - item := ndb.heightOrphansMap[version] - if item != nil { - return item.rootHash, true - } - return nil, false -} // DeleteVersion deletes a tree version from disk. func (ndb *nodeDB) DeleteVersion(batch dbm.Batch, version int64, checkLatestVersion bool) error { @@ -578,3 +511,41 @@ func orphanKeyFast(fromVersion, toVersion int64, hash []byte) []byte { copy(key[n+hashLen-len(hash):n+hashLen], hash) return key } + +func (ndb *nodeDB) cacheNode(node *Node) { + ndb.nc.cache(node) +} +func (ndb *nodeDB) uncacheNode(hash []byte) { + ndb.nc.uncache(hash) +} + +func (ndb *nodeDB) getNodeFromCache(hash []byte, promoteRecentNode bool) (n *Node) { + return ndb.nc.get(hash, promoteRecentNode) +} + +func (ndb *nodeDB) cacheNodeByCheck(node *Node) { + ndb.nc.cacheByCheck(node) +} + +func (ndb *nodeDB) uncacheNodeRontine(n []*Node) { + for _, node := range n { + ndb.uncacheNode(node.hash) + } +} + +func (ndb *nodeDB) initPreWriteCache() { + if ndb.preWriteNodeCache == nil { + ndb.preWriteNodeCache = cmap.New() + } +} + +func (ndb *nodeDB) cacheNodeToPreWriteCache(n *Node) { + ndb.preWriteNodeCache.Set(string(n.hash), n) +} + +func (ndb *nodeDB) finishPreWriteCache() { + ndb.preWriteNodeCache.IterCb(func(key string, v interface{}) { + ndb.cacheNode(v.(*Node)) + }) + ndb.preWriteNodeCache = nil +} \ No newline at end of file diff --git a/libs/iavl/nodedb_oec_test.go b/libs/iavl/nodedb_oec_test.go index 931c860b86..9327f0b34c 100644 --- a/libs/iavl/nodedb_oec_test.go +++ b/libs/iavl/nodedb_oec_test.go @@ -340,16 +340,17 @@ func Test_getRootWithCache(t *testing.T) { ndb := mockNodeDB() for _, c := range cases { rootHash := randBytes(32) - ndb.heightOrphansMap[c.version] = &heightOrphansItem{c.version, rootHash, nil} + ndb.oi.orphanItemMap[c.version] = &orphanItem{rootHash, nil} - actualHash, err := ndb.getRootWithCache(c.version) + actualHash, ok := ndb.findRootHash(c.version) if c.exist { require.Equal(t, actualHash, rootHash) } else { require.Nil(t, actualHash) } - require.NoError(t, err) + require.Equal(t, ok, true) + var err error actualHash, err = ndb.getRootWithCacheAndDB(c.version) if c.exist { require.Equal(t, actualHash, rootHash) @@ -374,9 +375,9 @@ func Test_inVersionCacheMap(t *testing.T) { ndb := mockNodeDB() for _, c := range cases { rootHash := randBytes(32) - orphanObj := &heightOrphansItem{version: c.version, rootHash: rootHash} - ndb.heightOrphansMap[c.version] = orphanObj - actualHash, existed := ndb.inVersionCacheMap(c.version) + orphanObj := &orphanItem{rootHash: rootHash} + ndb.oi.orphanItemMap[c.version] = orphanObj + actualHash, existed := ndb.findRootHash(c.version) require.Equal(t, actualHash, rootHash) require.Equal(t, existed, c.expected) } diff --git a/libs/iavl/nodedb_orphan.go b/libs/iavl/nodedb_orphan.go new file mode 100644 index 0000000000..d7214f206e --- /dev/null +++ b/libs/iavl/nodedb_orphan.go @@ -0,0 +1,63 @@ +package iavl + +import ( + "github.com/tendermint/go-amino" + "sync/atomic" +) + +func (ndb *nodeDB) enqueueOrphanTask(version int64, rootHash []byte, newOrphans []*Node) { + + ndb.addOrphanItem(version, rootHash) + + task := func() { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + ndb.saveNewOrphans(version, newOrphans, false) + ndb.oi.removeOldOrphans(version) + ndb.oi.enqueueResult(version) + } + + ndb.oi.enqueueTask(task) +} + +func (ndb *nodeDB) addOrphanItem(version int64, rootHash []byte) { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + ndb.oi.addOrphanItem(version, rootHash) +} + +func (ndb *nodeDB) saveNewOrphans(version int64, orphans []*Node, lock bool) { + + if orphans == nil { + return + } + + version-- + ndb.log(IavlDebug, "saving orphan node to OrphanCache", "size", len(orphans)) + atomic.AddInt64(&ndb.totalOrphanCount, int64(len(orphans))) + + if lock { + ndb.mtx.Lock() + defer ndb.mtx.Unlock() + } + + ndb.oi.feedOrphansMap(version, orphans) + for _, node := range orphans { + ndb.oi.feedOrphanNodeCache(node) + delete(ndb.prePersistNodeCache, amino.BytesToStr(node.hash)) + node.leftNode = nil + node.rightNode = nil + } + ndb.uncacheNodeRontine(orphans) +} + +func (ndb *nodeDB) sanityCheckHandleOrphansResult(version int64) { + ndb.oi.wait4Result(version) +} + +func (ndb *nodeDB) findRootHash(version int64) (res []byte, found bool) { + ndb.mtx.RLock() + defer ndb.mtx.RUnlock() + return ndb.oi.findRootHash(version) +} + diff --git a/libs/iavl/nodedb_orphan_info.go b/libs/iavl/nodedb_orphan_info.go new file mode 100644 index 0000000000..000ef77d2b --- /dev/null +++ b/libs/iavl/nodedb_orphan_info.go @@ -0,0 +1,124 @@ +package iavl + +import ( + "fmt" + "github.com/tendermint/go-amino" +) + +type OrphanInfo struct { + ndb *nodeDB + orphanNodeCache map[string]*Node + orphanItemMap map[int64]*orphanItem + itemSize int + + orphanTaskChan chan func() + resultChan chan int64 +} + +type orphanItem struct { + rootHash []byte + orphans []*Node +} + +func newOrphanInfo(ndb *nodeDB) *OrphanInfo { + + oi := &OrphanInfo{ + ndb: ndb, + orphanNodeCache: make(map[string]*Node), + orphanItemMap: make(map[int64]*orphanItem), + itemSize: HeightOrphansCacheSize, + orphanTaskChan: make(chan func(), 1), + resultChan: make(chan int64, 1), + } + + oi.enqueueResult(0) + go oi.handleOrphansRoutine() + return oi +} + +func (oi *OrphanInfo) enqueueResult(res int64) { + oi.resultChan <- res +} + +func (oi *OrphanInfo) enqueueTask(t func()) { + oi.orphanTaskChan <- t +} + +func (oi *OrphanInfo) handleOrphansRoutine() { + for task := range oi.orphanTaskChan { + task() + } +} + +func (oi *OrphanInfo) wait4Result(version int64) { + + version-- + for versionCompleted := range oi.resultChan { + if versionCompleted == version { + break + } else if versionCompleted == 0 { + break + } + } +} + +func (oi *OrphanInfo) addOrphanItem(version int64, rootHash []byte) { + if rootHash == nil { + rootHash = []byte{} + } + orphanObj := &orphanItem{ + rootHash: rootHash, + } + _, ok := oi.orphanItemMap[version] + if ok { + panic(fmt.Sprintf("unexpected orphanItemMap, version: %d", version)) + } + oi.orphanItemMap[version] = orphanObj +} + + +func (oi *OrphanInfo) removeOldOrphans(version int64) { + expiredVersion := version-int64(oi.itemSize) + expiredItem, ok := oi.orphanItemMap[expiredVersion] + if !ok { + return + } + for _, node := range expiredItem.orphans { + delete(oi.orphanNodeCache, amino.BytesToStr(node.hash)) + } + delete(oi.orphanItemMap, expiredVersion) +} + +func (oi *OrphanInfo) feedOrphansMap(version int64, orphans []*Node) { + v, ok := oi.orphanItemMap[version] + if !ok { + return + } + v.orphans = orphans +} + +func (oi *OrphanInfo) feedOrphanNodeCache(node *Node) { + oi.orphanNodeCache[string(node.hash)] = node +} + + +func (oi *OrphanInfo) getNodeFromOrphanCache(hash []byte) *Node { + elem, ok := oi.orphanNodeCache[string(hash)] + if ok { + return elem + } + return nil +} + +func (oi *OrphanInfo) orphanNodeCacheLen() int { + return len(oi.orphanNodeCache) +} + +func (oi *OrphanInfo) findRootHash(version int64) (res []byte, found bool) { + v, ok := oi.orphanItemMap[version] + if ok { + res = v.rootHash + found = true + } + return +} \ No newline at end of file