Skip to content

Commit

Permalink
Merge PR: optimize db batch in async commit (#2858)
Browse files Browse the repository at this point in the history
* mv saveOrphans to commit event

* save orphans to db

* save node to db

* flag

* update

* update

* update ac log

* nodeToDBValue

* pruning with single batch

* test

* revert

* rm fss writeToDB

* rm fss writeToDB

* rm fss writeToDB

* reduce diff

* use batch if stoptree

* when nobatch is turned off, keep the same as before

* fix ut

* dynamic config

* rm unused code

Co-authored-by: xiangjianmeng <[email protected]>
  • Loading branch information
cwbhhjl and xiangjianmeng authored Dec 13, 2022
1 parent 718fe1c commit a197d90
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 30 deletions.
19 changes: 19 additions & 0 deletions app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ type OecConfig struct {
// enable broadcast hasBlockPartMsg
enableHasBlockPartMsg bool
gcInterval int

iavlAcNoBatch bool
}

const (
Expand Down Expand Up @@ -296,6 +298,7 @@ func (c *OecConfig) loadFromConfig() {
c.SetBlockPartSize(viper.GetInt(server.FlagBlockPartSizeBytes))
c.SetEnableHasBlockPartMsg(viper.GetBool(FlagEnableHasBlockPartMsg))
c.SetGcInterval(viper.GetInt(FlagDebugGcInterval))
c.SetIavlAcNoBatch(viper.GetBool(tmiavl.FlagIavlCommitAsyncNoBatch))
}

func resolveNodeKeyWhitelist(plain string) []string {
Expand Down Expand Up @@ -366,6 +369,7 @@ func (c *OecConfig) format() string {
iavl-fast-storage-cache-size: %d
commit-gap-height: %d
enable-analyzer: %v
iavl-commit-async-no-batch: %v
active-view-change: %v`, system.ChainName,
c.GetMempoolRecheck(),
c.GetMempoolForceRecheckGap(),
Expand Down Expand Up @@ -393,6 +397,7 @@ func (c *OecConfig) format() string {
c.GetIavlFSCacheSize(),
c.GetCommitGapHeight(),
c.GetEnableAnalyzer(),
c.GetIavlAcNoBatch(),
c.GetActiveVC(),
)
}
Expand Down Expand Up @@ -624,6 +629,12 @@ func (c *OecConfig) updateFromKVStr(k, v string) {
return
}
c.SetGcInterval(r)
case tmiavl.FlagIavlCommitAsyncNoBatch:
r, err := strconv.ParseBool(v)
if err != nil {
return
}
c.SetIavlAcNoBatch(r)
}
}

Expand Down Expand Up @@ -1012,3 +1023,11 @@ func (c *OecConfig) GetEnableHasBlockPartMsg() bool {
func (c *OecConfig) SetEnableHasBlockPartMsg(value bool) {
c.enableHasBlockPartMsg = value
}

func (c *OecConfig) GetIavlAcNoBatch() bool {
return c.iavlAcNoBatch
}

func (c *OecConfig) SetIavlAcNoBatch(value bool) {
c.iavlAcNoBatch = value
}
1 change: 1 addition & 0 deletions libs/cosmos-sdk/server/start_okchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ func RegisterServerFlags(cmd *cobra.Command) *cobra.Command {

cmd.Flags().Int(iavl.FlagIavlCacheSize, 10000000, "Max size of iavl cache")
cmd.Flags().Float64(tmiavl.FlagIavlCacheInitRatio, 1, "iavl cache init ratio, 0.0~1.0, default is 0, iavl cache map would be init with (cache size * init ratio)")
cmd.Flags().Bool(tmiavl.FlagIavlCommitAsyncNoBatch, false, "experimental: iavl commit async without batch")
cmd.Flags().StringToInt(tmiavl.FlagOutputModules, map[string]int{"evm": 1, "acc": 1}, "decide which module in iavl to be printed")
cmd.Flags().Int64(tmiavl.FlagIavlCommitIntervalHeight, 100, "Max interval to commit node cache into leveldb")
cmd.Flags().Int64(tmiavl.FlagIavlMinCommitItemCount, 1000000, "Min nodes num to triggle node cache commit")
Expand Down
4 changes: 4 additions & 0 deletions libs/cosmos-sdk/store/rootmulti/rootmulti_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"strings"
"sync"

cfg "github.com/okex/exchain/libs/tendermint/config"

sdkmaps "github.com/okex/exchain/libs/cosmos-sdk/store/internal/maps"
"github.com/okex/exchain/libs/cosmos-sdk/store/mem"
"github.com/okex/exchain/libs/cosmos-sdk/store/mpt"
Expand Down Expand Up @@ -593,6 +595,8 @@ func (rs *Store) CommitterCommit(*iavltree.TreeDelta) (_ types.CommitID, _ *iavl

// Implements Committer/CommitStore.
func (rs *Store) CommitterCommitMap(inputDeltaMap iavltree.TreeDeltaMap) (types.CommitID, iavltree.TreeDeltaMap) {
iavltree.IavlCommitAsyncNoBatch = cfg.DynamicConfig.GetIavlAcNoBatch()

previousHeight := rs.lastCommitInfo.Version
version := previousHeight + 1

Expand Down
5 changes: 4 additions & 1 deletion libs/iavl/mutable_tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -973,13 +973,16 @@ func (ndb *nodeDB) saveFastNodeVersion(batch dbm.Batch, fnc *fastNodeChanges, ve
if !GetEnableFastStorage() || fnc == nil {
return nil
}
if err := ndb.setFastStorageVersionToBatch(batch, version); err != nil {
return err
}
if err := ndb.saveFastNodeAdditions(batch, fnc.getAdditions()); err != nil {
return err
}
if err := ndb.saveFastNodeRemovals(batch, fnc.getRemovals()); err != nil {
return err
}
return ndb.setFastStorageVersionToBatch(batch, version)
return nil
}

// nolint: unused
Expand Down
2 changes: 1 addition & 1 deletion libs/iavl/mutable_tree_brenchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func prepareTree(b *testing.B, openLogFlag bool, dbName string, size int) (*Muta
//recursivePrint(tree.root, 0)

tree.SaveVersion(false)
tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil}
tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil, nil, false}
fmt.Println("init setting done")
return tree, keySet, dataSet
}
Expand Down
41 changes: 32 additions & 9 deletions libs/iavl/mutable_tree_oec.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type commitEvent struct {
wg *sync.WaitGroup
iavlHeight int
fnc *fastNodeChanges
orphans []commitOrphan
isStop bool
}

type commitOrphan struct {
Expand Down Expand Up @@ -159,11 +161,14 @@ func (tree *MutableTree) removeVersion(version int64) {
func (tree *MutableTree) persist(version int64) {
var err error
batch := tree.NewBatch()
tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil}
tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil, nil, false}
var tpp map[string]*Node = nil
fnc := newFastNodeChanges()

var orphans []commitOrphan
if EnablePruningHistoryState {
tree.ndb.saveCommitOrphans(batch, version, tree.commitOrphans)
orphans = tree.commitOrphans
tree.commitOrphans = nil
}
if tree.root == nil {
// There can still be orphans, for example if the root is the node being removed.
Expand All @@ -183,7 +188,7 @@ func (tree *MutableTree) persist(version int64) {
}
versions := tree.deepCopyVersions()
tree.commitCh <- commitEvent{version, versions, batch,
tpp, nil, int(tree.Height()), fnc}
tpp, nil, int(tree.Height()), fnc, orphans, false}
tree.lastPersistHeight = version
}

Expand All @@ -201,9 +206,20 @@ func (tree *MutableTree) commitSchedule() {
}
continue
}

noBatch := false
if IavlCommitAsyncNoBatch && !event.isStop {
noBatch = true
}
trc := trace.NewTracer("commitSchedule")

if len(event.orphans) != 0 {
trc.Pin("saveCommitOrphans")
err := tree.ndb.saveCommitOrphans(event.batch, event.version, event.orphans, noBatch)
if err != nil {
panic(err)
}
}

trc.Pin("cacheNode")
for k, node := range event.tpp {
if !node.persisted {
Expand All @@ -213,9 +229,9 @@ func (tree *MutableTree) commitSchedule() {
}

trc.Pin("Pruning")
tree.updateCommittedStateHeightPool(event.batch, event.version, event.versions)
tree.updateCommittedStateHeightPool(event.batch, event.version, event.versions, noBatch)

tree.ndb.persistTpp(&event, trc)
tree.ndb.persistTpp(&event, noBatch, trc)
if event.wg != nil {
event.wg.Done()
break
Expand Down Expand Up @@ -278,7 +294,7 @@ func (tree *MutableTree) StopTreeWithVersion(version int64) {
wg.Add(1)
versions := tree.deepCopyVersions()

tree.commitCh <- commitEvent{tree.version, versions, batch, tpp, &wg, 0, fastNodeChanges}
tree.commitCh <- commitEvent{tree.version, versions, batch, tpp, &wg, 0, fastNodeChanges, nil, true}
wg.Wait()
}
func (tree *MutableTree) StopTree() {
Expand All @@ -289,7 +305,7 @@ func (tree *MutableTree) log(level int, msg string, kvs ...interface{}) {
iavlLog(tree.GetModuleName(), level, msg, kvs...)
}

func (tree *MutableTree) updateCommittedStateHeightPool(batch dbm.Batch, version int64, versions map[int64]bool) {
func (tree *MutableTree) updateCommittedStateHeightPool(batch dbm.Batch, version int64, versions map[int64]bool, writeToDB bool) {
queue := tree.committedHeightQueue
queue.PushBack(version)
tree.committedHeightMap[version] = true
Expand All @@ -300,13 +316,20 @@ func (tree *MutableTree) updateCommittedStateHeightPool(batch dbm.Batch, version
delete(tree.committedHeightMap, oldVersion)

if EnablePruningHistoryState {

if writeToDB {
batch = tree.ndb.db.NewBatch()
}
if err := tree.deleteVersion(batch, oldVersion, versions); err != nil {
tree.log(IavlErr, "Failed to delete", "height", oldVersion, "error", err.Error())
} else {
tree.log(IavlDebug, "History state removed", "version", oldVersion)
tree.removedVersions.Store(oldVersion, nil)
}
if writeToDB {
if err := tree.ndb.Commit(batch); err != nil {
panic(err)
}
}
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions libs/iavl/mutable_tree_oec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestPruningHistoryState(t *testing.T) {

batchSaveVersion(t, tree, minHistoryStateNum*int(CommitIntervalHeight)-2)

tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil}
tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil, nil, false}

iTree, err := tree.GetImmutable(CommitIntervalHeight * (minHistoryStateNum - 1))
require.NoError(t, err)
Expand Down Expand Up @@ -350,7 +350,7 @@ func TestPruningHistoryStateRandom(t *testing.T) {
require.NoError(t, err)
}

tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil}
tree.commitCh <- commitEvent{-1, nil, nil, nil, nil, 0, nil, nil, false}

nodeCount := 0
tree.ndb.traverseNodes(func(hash []byte, node *Node) {
Expand Down Expand Up @@ -538,9 +538,9 @@ func TestCommitSchedule(t *testing.T) {
wg.Add(1)
versions := tree.deepCopyVersions()
batch := tree.NewBatch()
tree.commitCh <- commitEvent{CommitIntervalHeight, versions, batch, nil, nil, 0, nil}
tree.commitCh <- commitEvent{CommitIntervalHeight, versions, batch, nil, nil, 0, nil, nil, false}

tree.commitCh <- commitEvent{CommitIntervalHeight, versions, batch, nil, &wg, 0, nil}
tree.commitCh <- commitEvent{CommitIntervalHeight, versions, batch, nil, &wg, 0, nil, nil, false}
wg.Wait()

}
8 changes: 8 additions & 0 deletions libs/iavl/nodedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,14 @@ func (ndb *nodeDB) saveOrphan(batch dbm.Batch, hash []byte, fromVersion, toVersi
batch.Set(key, hash)
}

func (ndb *nodeDB) saveOrphanToDB(hash []byte, fromVersion, toVersion int64) error {
if fromVersion > toVersion {
panic(fmt.Sprintf("Orphan expires before it comes alive. %d > %d", fromVersion, toVersion))
}
key := ndb.orphanKey(fromVersion, toVersion, hash)
return ndb.db.Set(key, hash)
}

func (ndb *nodeDB) log(level int, msg string, kv ...interface{}) {
iavlLog(ndb.name, level, msg, kv...)
}
Expand Down
Loading

0 comments on commit a197d90

Please sign in to comment.