From a197d9021c318f73f6b40fa8bae683b1620f8bb7 Mon Sep 17 00:00:00 2001 From: cwbhhjl Date: Tue, 13 Dec 2022 17:11:36 +0800 Subject: [PATCH] Merge PR: optimize db batch in async commit (#2858) * mv saveOrphans to commit event * save orphans to db * save node to db * flag * update * update * update ac log * nodeToDBValue * pruning with single batch * test * revert * rm fss writeToDB * rm fss writeToDB * rm fss writeToDB * reduce diff * use batch if stoptree * when nobatch is turned off, keep the same as before * fix ut * dynamic config * rm unused code Co-authored-by: xiangjianmeng <805442788@qq.com> --- app/config/config.go | 19 +++++ libs/cosmos-sdk/server/start_okchain.go | 1 + .../store/rootmulti/rootmulti_store.go | 4 ++ libs/iavl/mutable_tree.go | 5 +- libs/iavl/mutable_tree_brenchmark_test.go | 2 +- libs/iavl/mutable_tree_oec.go | 41 ++++++++--- libs/iavl/mutable_tree_oec_test.go | 8 +-- libs/iavl/nodedb.go | 8 +++ libs/iavl/nodedb_oec.go | 72 +++++++++++++++---- libs/iavl/nodedb_oec_test.go | 2 +- .../config/dynamic_config_okchain.go | 5 ++ 11 files changed, 137 insertions(+), 30 deletions(-) diff --git a/app/config/config.go b/app/config/config.go index cf01a3c121..2bf2834871 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -115,6 +115,8 @@ type OecConfig struct { // enable broadcast hasBlockPartMsg enableHasBlockPartMsg bool gcInterval int + + iavlAcNoBatch bool } const ( @@ -296,6 +298,7 @@ func (c *OecConfig) loadFromConfig() { c.SetBlockPartSize(viper.GetInt(server.FlagBlockPartSizeBytes)) c.SetEnableHasBlockPartMsg(viper.GetBool(FlagEnableHasBlockPartMsg)) c.SetGcInterval(viper.GetInt(FlagDebugGcInterval)) + c.SetIavlAcNoBatch(viper.GetBool(tmiavl.FlagIavlCommitAsyncNoBatch)) } func resolveNodeKeyWhitelist(plain string) []string { @@ -366,6 +369,7 @@ func (c *OecConfig) format() string { iavl-fast-storage-cache-size: %d commit-gap-height: %d enable-analyzer: %v + iavl-commit-async-no-batch: %v active-view-change: %v`, system.ChainName, c.GetMempoolRecheck(), c.GetMempoolForceRecheckGap(), @@ -393,6 +397,7 @@ func (c *OecConfig) format() string { c.GetIavlFSCacheSize(), c.GetCommitGapHeight(), c.GetEnableAnalyzer(), + c.GetIavlAcNoBatch(), c.GetActiveVC(), ) } @@ -624,6 +629,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) { return } c.SetGcInterval(r) + case tmiavl.FlagIavlCommitAsyncNoBatch: + r, err := strconv.ParseBool(v) + if err != nil { + return + } + c.SetIavlAcNoBatch(r) } } @@ -1012,3 +1023,11 @@ func (c *OecConfig) GetEnableHasBlockPartMsg() bool { func (c *OecConfig) SetEnableHasBlockPartMsg(value bool) { c.enableHasBlockPartMsg = value } + +func (c *OecConfig) GetIavlAcNoBatch() bool { + return c.iavlAcNoBatch +} + +func (c *OecConfig) SetIavlAcNoBatch(value bool) { + c.iavlAcNoBatch = value +} diff --git a/libs/cosmos-sdk/server/start_okchain.go b/libs/cosmos-sdk/server/start_okchain.go index b2669d6d98..200c5d7b5e 100644 --- a/libs/cosmos-sdk/server/start_okchain.go +++ b/libs/cosmos-sdk/server/start_okchain.go @@ -201,6 +201,7 @@ func RegisterServerFlags(cmd *cobra.Command) *cobra.Command { cmd.Flags().Int(iavl.FlagIavlCacheSize, 10000000, "Max size of iavl cache") cmd.Flags().Float64(tmiavl.FlagIavlCacheInitRatio, 1, "iavl cache init ratio, 0.0~1.0, default is 0, iavl cache map would be init with (cache size * init ratio)") + cmd.Flags().Bool(tmiavl.FlagIavlCommitAsyncNoBatch, false, "experimental: iavl commit async without batch") cmd.Flags().StringToInt(tmiavl.FlagOutputModules, map[string]int{"evm": 1, "acc": 1}, "decide which module in iavl to be printed") cmd.Flags().Int64(tmiavl.FlagIavlCommitIntervalHeight, 100, "Max interval to commit node cache into leveldb") cmd.Flags().Int64(tmiavl.FlagIavlMinCommitItemCount, 1000000, "Min nodes num to triggle node cache commit") diff --git a/libs/cosmos-sdk/store/rootmulti/rootmulti_store.go b/libs/cosmos-sdk/store/rootmulti/rootmulti_store.go index 14089a6e91..9191879933 100644 --- a/libs/cosmos-sdk/store/rootmulti/rootmulti_store.go +++ b/libs/cosmos-sdk/store/rootmulti/rootmulti_store.go @@ -10,6 +10,8 @@ import ( "strings" "sync" + cfg "github.com/okex/exchain/libs/tendermint/config" + sdkmaps "github.com/okex/exchain/libs/cosmos-sdk/store/internal/maps" "github.com/okex/exchain/libs/cosmos-sdk/store/mem" "github.com/okex/exchain/libs/cosmos-sdk/store/mpt" @@ -593,6 +595,8 @@ func (rs *Store) CommitterCommit(*iavltree.TreeDelta) (_ types.CommitID, _ *iavl // Implements Committer/CommitStore. func (rs *Store) CommitterCommitMap(inputDeltaMap iavltree.TreeDeltaMap) (types.CommitID, iavltree.TreeDeltaMap) { + iavltree.IavlCommitAsyncNoBatch = cfg.DynamicConfig.GetIavlAcNoBatch() + previousHeight := rs.lastCommitInfo.Version version := previousHeight + 1 diff --git a/libs/iavl/mutable_tree.go b/libs/iavl/mutable_tree.go index d8b95a7a62..b36a4bd624 100644 --- a/libs/iavl/mutable_tree.go +++ b/libs/iavl/mutable_tree.go @@ -973,13 +973,16 @@ func (ndb *nodeDB) saveFastNodeVersion(batch dbm.Batch, fnc *fastNodeChanges, ve if !GetEnableFastStorage() || fnc == nil { return nil } + if err := ndb.setFastStorageVersionToBatch(batch, version); err != nil { + return err + } if err := ndb.saveFastNodeAdditions(batch, fnc.getAdditions()); err != nil { return err } if err := ndb.saveFastNodeRemovals(batch, fnc.getRemovals()); err != nil { return err } - return ndb.setFastStorageVersionToBatch(batch, version) + return nil } // nolint: unused diff --git a/libs/iavl/mutable_tree_brenchmark_test.go b/libs/iavl/mutable_tree_brenchmark_test.go index 4305643ad0..2ca2bf9c1e 100644 --- a/libs/iavl/mutable_tree_brenchmark_test.go +++ b/libs/iavl/mutable_tree_brenchmark_test.go @@ -37,7 +37,7 @@ func prepareTree(b *testing.B, openLogFlag bool, dbName string, size int) (*Muta //recursivePrint(tree.root, 0) tree.SaveVersion(false) - tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil} + tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil, nil, false} fmt.Println("init setting done") return tree, keySet, dataSet } diff --git a/libs/iavl/mutable_tree_oec.go b/libs/iavl/mutable_tree_oec.go index 0217c607d7..5c38e95ae1 100644 --- a/libs/iavl/mutable_tree_oec.go +++ b/libs/iavl/mutable_tree_oec.go @@ -48,6 +48,8 @@ type commitEvent struct { wg *sync.WaitGroup iavlHeight int fnc *fastNodeChanges + orphans []commitOrphan + isStop bool } type commitOrphan struct { @@ -159,11 +161,14 @@ func (tree *MutableTree) removeVersion(version int64) { func (tree *MutableTree) persist(version int64) { var err error batch := tree.NewBatch() - tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil} + tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil, nil, false} var tpp map[string]*Node = nil fnc := newFastNodeChanges() + + var orphans []commitOrphan if EnablePruningHistoryState { - tree.ndb.saveCommitOrphans(batch, version, tree.commitOrphans) + orphans = tree.commitOrphans + tree.commitOrphans = nil } if tree.root == nil { // There can still be orphans, for example if the root is the node being removed. @@ -183,7 +188,7 @@ func (tree *MutableTree) persist(version int64) { } versions := tree.deepCopyVersions() tree.commitCh <- commitEvent{version, versions, batch, - tpp, nil, int(tree.Height()), fnc} + tpp, nil, int(tree.Height()), fnc, orphans, false} tree.lastPersistHeight = version } @@ -201,9 +206,20 @@ func (tree *MutableTree) commitSchedule() { } continue } - + noBatch := false + if IavlCommitAsyncNoBatch && !event.isStop { + noBatch = true + } trc := trace.NewTracer("commitSchedule") + if len(event.orphans) != 0 { + trc.Pin("saveCommitOrphans") + err := tree.ndb.saveCommitOrphans(event.batch, event.version, event.orphans, noBatch) + if err != nil { + panic(err) + } + } + trc.Pin("cacheNode") for k, node := range event.tpp { if !node.persisted { @@ -213,9 +229,9 @@ func (tree *MutableTree) commitSchedule() { } trc.Pin("Pruning") - tree.updateCommittedStateHeightPool(event.batch, event.version, event.versions) + tree.updateCommittedStateHeightPool(event.batch, event.version, event.versions, noBatch) - tree.ndb.persistTpp(&event, trc) + tree.ndb.persistTpp(&event, noBatch, trc) if event.wg != nil { event.wg.Done() break @@ -278,7 +294,7 @@ func (tree *MutableTree) StopTreeWithVersion(version int64) { wg.Add(1) versions := tree.deepCopyVersions() - tree.commitCh <- commitEvent{tree.version, versions, batch, tpp, &wg, 0, fastNodeChanges} + tree.commitCh <- commitEvent{tree.version, versions, batch, tpp, &wg, 0, fastNodeChanges, nil, true} wg.Wait() } func (tree *MutableTree) StopTree() { @@ -289,7 +305,7 @@ func (tree *MutableTree) log(level int, msg string, kvs ...interface{}) { iavlLog(tree.GetModuleName(), level, msg, kvs...) } -func (tree *MutableTree) updateCommittedStateHeightPool(batch dbm.Batch, version int64, versions map[int64]bool) { +func (tree *MutableTree) updateCommittedStateHeightPool(batch dbm.Batch, version int64, versions map[int64]bool, writeToDB bool) { queue := tree.committedHeightQueue queue.PushBack(version) tree.committedHeightMap[version] = true @@ -300,13 +316,20 @@ func (tree *MutableTree) updateCommittedStateHeightPool(batch dbm.Batch, version delete(tree.committedHeightMap, oldVersion) if EnablePruningHistoryState { - + if writeToDB { + batch = tree.ndb.db.NewBatch() + } if err := tree.deleteVersion(batch, oldVersion, versions); err != nil { tree.log(IavlErr, "Failed to delete", "height", oldVersion, "error", err.Error()) } else { tree.log(IavlDebug, "History state removed", "version", oldVersion) tree.removedVersions.Store(oldVersion, nil) } + if writeToDB { + if err := tree.ndb.Commit(batch); err != nil { + panic(err) + } + } } } } diff --git a/libs/iavl/mutable_tree_oec_test.go b/libs/iavl/mutable_tree_oec_test.go index ea05d5e41d..c44401066e 100644 --- a/libs/iavl/mutable_tree_oec_test.go +++ b/libs/iavl/mutable_tree_oec_test.go @@ -286,7 +286,7 @@ func TestPruningHistoryState(t *testing.T) { batchSaveVersion(t, tree, minHistoryStateNum*int(CommitIntervalHeight)-2) - tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil} + tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil, nil, false} iTree, err := tree.GetImmutable(CommitIntervalHeight * (minHistoryStateNum - 1)) require.NoError(t, err) @@ -350,7 +350,7 @@ func TestPruningHistoryStateRandom(t *testing.T) { require.NoError(t, err) } - tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil} + tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil, nil, false} nodeCount := 0 tree.ndb.traverseNodes(func(hash []byte, node *Node) { @@ -538,9 +538,9 @@ func TestCommitSchedule(t *testing.T) { wg.Add(1) versions := tree.deepCopyVersions() batch := tree.NewBatch() - tree.commitCh <- commitEvent{CommitIntervalHeight, versions, batch, nil, nil, 0, nil} + tree.commitCh <- commitEvent{CommitIntervalHeight, versions, batch, nil, nil, 0, nil, nil, false} - tree.commitCh <- commitEvent{CommitIntervalHeight, versions, batch, nil, &wg, 0, nil} + tree.commitCh <- commitEvent{CommitIntervalHeight, versions, batch, nil, &wg, 0, nil, nil, false} wg.Wait() } diff --git a/libs/iavl/nodedb.go b/libs/iavl/nodedb.go index c1f901152a..0b6148e540 100644 --- a/libs/iavl/nodedb.go +++ b/libs/iavl/nodedb.go @@ -591,6 +591,14 @@ func (ndb *nodeDB) saveOrphan(batch dbm.Batch, hash []byte, fromVersion, toVersi batch.Set(key, hash) } +func (ndb *nodeDB) saveOrphanToDB(hash []byte, fromVersion, toVersion int64) error { + if fromVersion > toVersion { + panic(fmt.Sprintf("Orphan expires before it comes alive. %d > %d", fromVersion, toVersion)) + } + key := ndb.orphanKey(fromVersion, toVersion, hash) + return ndb.db.Set(key, hash) +} + func (ndb *nodeDB) log(level int, msg string, kv ...interface{}) { iavlLog(ndb.name, level, msg, kv...) } diff --git a/libs/iavl/nodedb_oec.go b/libs/iavl/nodedb_oec.go index 1f6a2a99c0..05f74f27fe 100644 --- a/libs/iavl/nodedb_oec.go +++ b/libs/iavl/nodedb_oec.go @@ -17,11 +17,13 @@ import ( ) const ( - FlagIavlCacheInitRatio = "iavl-cache-init-ratio" + FlagIavlCacheInitRatio = "iavl-cache-init-ratio" + FlagIavlCommitAsyncNoBatch = "iavl-commit-async-no-batch" ) var ( - IavlCacheInitRatio float64 = 0 + IavlCacheInitRatio float64 = 0 + IavlCommitAsyncNoBatch bool = false ) type tppItem struct { @@ -90,17 +92,29 @@ func (ndb *nodeDB) saveNodeToPrePersistCache(node *Node) { ndb.mtx.Unlock() } -func (ndb *nodeDB) persistTpp(event *commitEvent, trc *trace.Tracer) { +func (ndb *nodeDB) persistTpp(event *commitEvent, writeToDB bool, trc *trace.Tracer) { batch := event.batch tpp := event.tpp - trc.Pin("batchSet") - for _, node := range tpp { - ndb.batchSet(node, batch) + trc.Pin("batchSet-node") + if !writeToDB { + for _, node := range tpp { + ndb.batchSet(node, batch) + } + } else { + for _, node := range tpp { + err := ndb.saveNodeToDB(node) + if err != nil { + panic(err) + } + } } + ndb.state.increasePersistedCount(len(tpp)) ndb.addDBWriteCount(int64(len(tpp))) + trc.Pin("batchSet-fss") + if err := ndb.saveFastNodeVersion(batch, event.fnc, event.version); err != nil { panic(err) } @@ -111,7 +125,6 @@ func (ndb *nodeDB) persistTpp(event *commitEvent, trc *trace.Tracer) { } ndb.asyncPersistTppFinised(event, trc) - ndb.tpfv.remove(event.version) } func (ndb *nodeDB) asyncPersistTppStart(version int64) (map[string]*Node, *fastNodeChanges) { @@ -149,16 +162,19 @@ func (ndb *nodeDB) asyncPersistTppFinised(event *commitEvent, trc *trace.Tracer) nodeNum := ndb.tpp.getTppNodesNum() ndb.tpp.removeFromTpp(version) + ndb.tpfv.remove(event.version) ndb.log(IavlInfo, "CommitSchedule", "Height", version, "Tree", ndb.name, "IavlHeight", iavlHeight, "NodeNum", nodeNum, + "tpp", len(event.tpp), + "fss-add", len(event.fnc.additions), + "fss-rm", len(event.fnc.removals), "trc", trc.Format()) } -// SaveNode saves a node to disk. -func (ndb *nodeDB) batchSet(node *Node, batch dbm.Batch) { +func nodeToDBValue(node *Node) []byte { if node.hash == nil { panic("Expected to find node.hash, but none found.") } @@ -178,14 +194,31 @@ func (ndb *nodeDB) batchSet(node *Node, batch dbm.Batch) { panic(err) } + return buf.Bytes() +} + +// SaveNode saves a node to disk. +func (ndb *nodeDB) batchSet(node *Node, batch dbm.Batch) { nodeKey := ndb.nodeKey(node.hash) - nodeValue := buf.Bytes() + nodeValue := nodeToDBValue(node) + batch.Set(nodeKey, nodeValue) ndb.state.increasePersistedSize(len(nodeKey) + len(nodeValue)) ndb.log(IavlDebug, "BATCH SAVE", "hash", node.hash) //node.persisted = true // move to function MovePrePersistCacheToTempCache } +// SaveNode saves a node to disk. +func (ndb *nodeDB) saveNodeToDB(node *Node) error { + nodeKey := ndb.nodeKey(node.hash) + nodeValue := nodeToDBValue(node) + err := ndb.db.Set(nodeKey, nodeValue) + ndb.state.increasePersistedSize(len(nodeKey) + len(nodeValue)) + ndb.log(IavlDebug, "SAVE NODE", "hash", node.hash) + //node.persisted = true // move to function MovePrePersistCacheToTempCache + return err +} + func (ndb *nodeDB) NewBatch() dbm.Batch { return ndb.db.NewBatch() } @@ -193,13 +226,24 @@ func (ndb *nodeDB) NewBatch() dbm.Batch { // Saves orphaned nodes to disk under a special prefix. // version: the new version being saved. // orphans: the orphan nodes created since version-1 -func (ndb *nodeDB) saveCommitOrphans(batch dbm.Batch, version int64, orphans []commitOrphan) { +func (ndb *nodeDB) saveCommitOrphans(batch dbm.Batch, version int64, orphans []commitOrphan, writeToDB bool) error { ndb.log(IavlDebug, "saving committed orphan node log to disk") toVersion := ndb.getPreviousVersion(version) - for _, orphan := range orphans { - // ndb.log(IavlDebug, "SAVEORPHAN", "from", orphan.Version, "to", toVersion, "hash", amino.BytesHexStringer(orphan.NodeHash)) - ndb.saveOrphan(batch, orphan.NodeHash, orphan.Version, toVersion) + if !writeToDB { + for _, orphan := range orphans { + // ndb.log(IavlDebug, "SAVEORPHAN", "from", orphan.Version, "to", toVersion, "hash", amino.BytesHexStringer(orphan.NodeHash)) + ndb.saveOrphan(batch, orphan.NodeHash, orphan.Version, toVersion) + } + } else { + for _, orphan := range orphans { + // ndb.log(IavlDebug, "SAVEORPHAN", "from", orphan.Version, "to", toVersion, "hash", amino.BytesHexStringer(orphan.NodeHash)) + err := ndb.saveOrphanToDB(orphan.NodeHash, orphan.Version, toVersion) + if err != nil { + return err + } + } } + return nil } func (ndb *nodeDB) getRootWithCacheAndDB(version int64) ([]byte, error) { diff --git a/libs/iavl/nodedb_oec_test.go b/libs/iavl/nodedb_oec_test.go index ab471169a8..cba86cfa46 100644 --- a/libs/iavl/nodedb_oec_test.go +++ b/libs/iavl/nodedb_oec_test.go @@ -275,7 +275,7 @@ func Test_saveCommitOrphans(t *testing.T) { require.NoError(t, ndb.Commit(batch1)) batch2 := ndb.NewBatch() - ndb.saveCommitOrphans(batch2, c.version+1, commitOrphans) + ndb.saveCommitOrphans(batch2, c.version+1, commitOrphans, false) require.NoError(t, ndb.Commit(batch2)) for _, orphan := range commitOrphans { diff --git a/libs/tendermint/config/dynamic_config_okchain.go b/libs/tendermint/config/dynamic_config_okchain.go index 7ce1ef5351..d5ce1f60fb 100644 --- a/libs/tendermint/config/dynamic_config_okchain.go +++ b/libs/tendermint/config/dynamic_config_okchain.go @@ -25,6 +25,7 @@ type IDynamicConfig interface { GetEnableWtx() bool GetDeliverTxsExecuteMode() int GetEnableHasBlockPartMsg() bool + GetIavlAcNoBatch() bool } var DynamicConfig IDynamicConfig = MockDynamicConfig{} @@ -116,3 +117,7 @@ func (d MockDynamicConfig) GetDeliverTxsExecuteMode() int { func (d MockDynamicConfig) GetEnableHasBlockPartMsg() bool { return false } + +func (d MockDynamicConfig) GetIavlAcNoBatch() bool { + return false +}