From 7ad07fb40f12094ff8aa62a226321e5116e49b1e Mon Sep 17 00:00:00 2001 From: MengXiangJian <805442788@qq.com> Date: Tue, 15 Jun 2021 16:21:43 +0800 Subject: [PATCH] Merge PR: fix generate bloom filter failed occuently (#903) * fix generate failed occuently * remove defer for ProcessSection * bump version to v0.18.7.9 * fix generate bloom filter failed occuently * change version to v0.18.8 Co-authored-by: evan.han --- Makefile | 2 +- cmd/client/flags.go | 4 ++-- cmd/exchaincli/main.go | 3 +-- x/evm/keeper/abci.go | 13 ++++++++--- x/evm/keeper/keeper.go | 4 +--- x/evm/types/indexer.go | 50 ++++++++++++++++++++++++++++++++---------- 6 files changed, 54 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index fa6ceb66f3..37ef9be0e0 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,7 @@ export GO111MODULE=on GithubTop=github.com -Version=v0.18.7 +Version=v0.18.8 CosmosSDK=v0.39.2 Tendermint=v0.33.9 Iavl=v0.14.3 diff --git a/cmd/client/flags.go b/cmd/client/flags.go index 0562e15da4..3ae7ac59f6 100644 --- a/cmd/client/flags.go +++ b/cmd/client/flags.go @@ -15,8 +15,8 @@ func RegisterAppFlag(cmd *cobra.Command) { cmd.Flags().Bool(watcher.FlagFastQuery, false, "Enable the fast query mode for rpc queries") cmd.Flags().Int(watcher.FlagFastQueryLru, 1000, "Set the size of LRU cache under fast-query mode") cmd.Flags().Bool(rpc.FlagPersonalAPI, true, "Enable the personal_ prefixed set of APIs in the Web3 JSON-RPC spec") - cmd.Flags().Bool(evmtypes.FlagEnableBloomFilter, true, "Enable bloom filter for event logs") - cmd.Flags().Int64(filters.FlagGetLogsHeightSpan, -1, "config the block height span for get logs") + cmd.Flags().Bool(evmtypes.FlagEnableBloomFilter, false, "Enable bloom filter for event logs") + cmd.Flags().Int64(filters.FlagGetLogsHeightSpan, 2000, "config the block height span for get logs") cmd.Flags().String(stream.NacosTmrpcUrls, "", "Stream plugin`s nacos server urls for discovery service of tendermint rpc") cmd.Flags().String(stream.NacosTmrpcNamespaceID, "", "Stream plugin`s nacos namepace id for discovery service of tendermint rpc") cmd.Flags().String(stream.NacosTmrpcAppName, "", "Stream plugin`s tendermint rpc name in eureka or nacos") diff --git a/cmd/exchaincli/main.go b/cmd/exchaincli/main.go index 85477e8620..ec9b107a77 100644 --- a/cmd/exchaincli/main.go +++ b/cmd/exchaincli/main.go @@ -28,14 +28,13 @@ import ( "github.com/cosmos/cosmos-sdk/x/auth" authcmd "github.com/cosmos/cosmos-sdk/x/auth/client/cli" "github.com/cosmos/cosmos-sdk/x/bank" - tokencmd "github.com/okex/exchain/x/token/client/cli" - "github.com/okex/exchain/app" "github.com/okex/exchain/app/codec" "github.com/okex/exchain/app/crypto/ethsecp256k1" "github.com/okex/exchain/app/rpc" okexchain "github.com/okex/exchain/app/types" "github.com/okex/exchain/cmd/client" + tokencmd "github.com/okex/exchain/x/token/client/cli" ) var ( diff --git a/x/evm/keeper/abci.go b/x/evm/keeper/abci.go index 80ea1976b9..3349c08e3a 100644 --- a/x/evm/keeper/abci.go +++ b/x/evm/keeper/abci.go @@ -69,9 +69,16 @@ func (k Keeper) EndBlock(ctx sdk.Context, req abci.RequestEndBlock) []abci.Valid // the hash of current block is stored when executing BeginBlock of next block. // so update section in the next block. if indexer := types.GetIndexer(); indexer != nil { - interval := uint64(req.Height - tmtypes.GetStartBlockHeight()) - if interval >= (indexer.GetValidSections()+1)*types.BloomBitsBlocks && !types.GetIndexer().IsProcessing() { - go types.GetIndexer().ProcessSection(ctx, k, interval) + if types.GetIndexer().IsProcessing() { + // notify new height + go func() { + indexer.NotifyNewHeight(ctx) + }() + } else { + interval := uint64(req.Height - tmtypes.GetStartBlockHeight()) + if interval >= (indexer.GetValidSections()+1)*types.BloomBitsBlocks { + go types.GetIndexer().ProcessSection(ctx, k, interval) + } } } } diff --git a/x/evm/keeper/keeper.go b/x/evm/keeper/keeper.go index d5c037d86d..f8ff030f2e 100644 --- a/x/evm/keeper/keeper.go +++ b/x/evm/keeper/keeper.go @@ -17,7 +17,6 @@ import ( "github.com/okex/exchain/x/evm/types" "github.com/okex/exchain/x/evm/watcher" "github.com/okex/exchain/x/params" - "github.com/spf13/viper" "github.com/tendermint/tendermint/libs/log" ) @@ -60,8 +59,7 @@ func NewKeeper( paramSpace = paramSpace.WithKeyTable(types.ParamKeyTable()) } - if enable := viper.GetBool(types.FlagEnableBloomFilter); enable { - types.SetEnableBloomFilter(enable) + if enable := types.GetEnableBloomFilter(); enable { db := types.BloomDb() types.InitIndexer(db) } diff --git a/x/evm/types/indexer.go b/x/evm/types/indexer.go index c608468f0c..4e155763e1 100644 --- a/x/evm/types/indexer.go +++ b/x/evm/types/indexer.go @@ -10,12 +10,14 @@ import ( tmtypes "github.com/tendermint/tendermint/types" dbm "github.com/tendermint/tm-db" "path/filepath" + "sync" "sync/atomic" ) var ( indexer *Indexer enableBloomFilter bool + once sync.Once ) type Keeper interface { @@ -32,13 +34,12 @@ func init() { } func GetEnableBloomFilter() bool { + once.Do(func() { + enableBloomFilter = viper.GetBool(FlagEnableBloomFilter) + }) return enableBloomFilter } -func SetEnableBloomFilter(enable bool) { - enableBloomFilter = enable -} - // Indexer does a post-processing job for equally sized sections of the // canonical chain (like BlooomBits and CHT structures). A Indexer is // connected to the blockchain through the event system by starting a @@ -51,8 +52,8 @@ func SetEnableBloomFilter(enable bool) { type Indexer struct { backend bloomIndexer // Background processor generating the index data content - update chan struct{} // Notification channel that headers should be processed - quit chan struct{} // Quit channel to tear down running goroutines + update chan sdk.Context // Notification channel that headers should be processed + quit chan struct{} // Quit channel to tear down running goroutines storedSections uint64 // Number of sections successfully indexed into the database processing uint32 // Atomic flag whether indexer is processing or not @@ -65,7 +66,7 @@ func InitIndexer(db dbm.DB) { indexer = &Indexer{ backend: initBloomIndexer(db), - update: make(chan struct{}), + update: make(chan sdk.Context), quit: make(chan struct{}), } indexer.setValidSections(indexer.GetValidSections()) @@ -93,14 +94,19 @@ func (i *Indexer) StoredSection() uint64 { } func (i *Indexer) IsProcessing() bool { - return i.processing == 1 + return atomic.LoadUint32(&i.processing) == 1 } func (i *Indexer) ProcessSection(ctx sdk.Context, k Keeper, interval uint64) { if atomic.SwapUint32(&i.processing, 1) == 1 { - ctx.Logger().Error("matcher already running") + ctx.Logger().Error("matcher is already running") return } + defer func() { + if r := recover(); r != nil { + ctx.Logger().Error("ProcessSection panic height", ctx.BlockHeight(), r) + } + }() defer atomic.StoreUint32(&i.processing, 0) knownSection := interval / BloomBitsBlocks for i.storedSections < knownSection { @@ -126,6 +132,7 @@ func (i *Indexer) ProcessSection(ctx sdk.Context, k Keeper, interval uint64) { bloom ethtypes.Bloom hash common.Hash ) + ctx = i.updateCtx(ctx) // the initial height is 1 but it on ethereum is 0. so set the bloom and hash of the block 0 to empty. if number == uint64(tmtypes.GetStartBlockHeight()) { bloom = ethtypes.Bloom{} @@ -176,7 +183,7 @@ func (i *Indexer) setValidSections(sections uint64) { i.storedSections = sections // needed if new > old } -// loadValidSections reads the number of valid sections from the index database +// GetValidSections reads the number of valid sections from the index database // and caches is into the local state. func (i *Indexer) GetValidSections() uint64 { data, _ := i.backend.db.Get([]byte("count")) @@ -186,7 +193,7 @@ func (i *Indexer) GetValidSections() uint64 { return 0 } -// SectionHead retrieves the last block hash of a processed section from the +// sectionHead retrieves the last block hash of a processed section from the // index database. func (i *Indexer) sectionHead(section uint64) common.Hash { var data [8]byte @@ -216,3 +223,24 @@ func (i *Indexer) removeSectionHead(section uint64) { i.backend.db.Delete(append([]byte("shead"), data[:]...)) } + +func (i *Indexer) NotifyNewHeight(ctx sdk.Context) { + i.update <- ctx +} + +func (i *Indexer) updateCtx(oldCtx sdk.Context) sdk.Context { + newCtx := oldCtx + exit := false + for { + select { + case newCtx = <-i.update: + default: + exit = true + } + if exit { + break + } + } + + return newCtx +} \ No newline at end of file