Skip to content

Commit

Permalink
Merge PR: fix generate bloom filter failed occuently (#903)
Browse files Browse the repository at this point in the history
* fix generate failed occuently

* remove defer for ProcessSection

* bump version to v0.18.7.9

* fix generate bloom filter failed occuently

* change version to v0.18.8

Co-authored-by: evan.han <[email protected]>
  • Loading branch information
xiangjianmeng and ilovers authored Jun 15, 2021
1 parent e5ab93b commit 7ad07fb
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ export GO111MODULE=on
GithubTop=github.com


Version=v0.18.7
Version=v0.18.8
CosmosSDK=v0.39.2
Tendermint=v0.33.9
Iavl=v0.14.3
Expand Down
4 changes: 2 additions & 2 deletions cmd/client/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ func RegisterAppFlag(cmd *cobra.Command) {
cmd.Flags().Bool(watcher.FlagFastQuery, false, "Enable the fast query mode for rpc queries")
cmd.Flags().Int(watcher.FlagFastQueryLru, 1000, "Set the size of LRU cache under fast-query mode")
cmd.Flags().Bool(rpc.FlagPersonalAPI, true, "Enable the personal_ prefixed set of APIs in the Web3 JSON-RPC spec")
cmd.Flags().Bool(evmtypes.FlagEnableBloomFilter, true, "Enable bloom filter for event logs")
cmd.Flags().Int64(filters.FlagGetLogsHeightSpan, -1, "config the block height span for get logs")
cmd.Flags().Bool(evmtypes.FlagEnableBloomFilter, false, "Enable bloom filter for event logs")
cmd.Flags().Int64(filters.FlagGetLogsHeightSpan, 2000, "config the block height span for get logs")
cmd.Flags().String(stream.NacosTmrpcUrls, "", "Stream plugin`s nacos server urls for discovery service of tendermint rpc")
cmd.Flags().String(stream.NacosTmrpcNamespaceID, "", "Stream plugin`s nacos namepace id for discovery service of tendermint rpc")
cmd.Flags().String(stream.NacosTmrpcAppName, "", "Stream plugin`s tendermint rpc name in eureka or nacos")
Expand Down
3 changes: 1 addition & 2 deletions cmd/exchaincli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@ import (
"github.com/cosmos/cosmos-sdk/x/auth"
authcmd "github.com/cosmos/cosmos-sdk/x/auth/client/cli"
"github.com/cosmos/cosmos-sdk/x/bank"
tokencmd "github.com/okex/exchain/x/token/client/cli"

"github.com/okex/exchain/app"
"github.com/okex/exchain/app/codec"
"github.com/okex/exchain/app/crypto/ethsecp256k1"
"github.com/okex/exchain/app/rpc"
okexchain "github.com/okex/exchain/app/types"
"github.com/okex/exchain/cmd/client"
tokencmd "github.com/okex/exchain/x/token/client/cli"
)

var (
Expand Down
13 changes: 10 additions & 3 deletions x/evm/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,16 @@ func (k Keeper) EndBlock(ctx sdk.Context, req abci.RequestEndBlock) []abci.Valid
// the hash of current block is stored when executing BeginBlock of next block.
// so update section in the next block.
if indexer := types.GetIndexer(); indexer != nil {
interval := uint64(req.Height - tmtypes.GetStartBlockHeight())
if interval >= (indexer.GetValidSections()+1)*types.BloomBitsBlocks && !types.GetIndexer().IsProcessing() {
go types.GetIndexer().ProcessSection(ctx, k, interval)
if types.GetIndexer().IsProcessing() {
// notify new height
go func() {
indexer.NotifyNewHeight(ctx)
}()
} else {
interval := uint64(req.Height - tmtypes.GetStartBlockHeight())
if interval >= (indexer.GetValidSections()+1)*types.BloomBitsBlocks {
go types.GetIndexer().ProcessSection(ctx, k, interval)
}
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions x/evm/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/okex/exchain/x/evm/types"
"github.com/okex/exchain/x/evm/watcher"
"github.com/okex/exchain/x/params"
"github.com/spf13/viper"
"github.com/tendermint/tendermint/libs/log"
)

Expand Down Expand Up @@ -60,8 +59,7 @@ func NewKeeper(
paramSpace = paramSpace.WithKeyTable(types.ParamKeyTable())
}

if enable := viper.GetBool(types.FlagEnableBloomFilter); enable {
types.SetEnableBloomFilter(enable)
if enable := types.GetEnableBloomFilter(); enable {
db := types.BloomDb()
types.InitIndexer(db)
}
Expand Down
50 changes: 39 additions & 11 deletions x/evm/types/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
tmtypes "github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tm-db"
"path/filepath"
"sync"
"sync/atomic"
)

var (
indexer *Indexer
enableBloomFilter bool
once sync.Once
)

type Keeper interface {
Expand All @@ -32,13 +34,12 @@ func init() {
}

func GetEnableBloomFilter() bool {
once.Do(func() {
enableBloomFilter = viper.GetBool(FlagEnableBloomFilter)
})
return enableBloomFilter
}

func SetEnableBloomFilter(enable bool) {
enableBloomFilter = enable
}

// Indexer does a post-processing job for equally sized sections of the
// canonical chain (like BlooomBits and CHT structures). A Indexer is
// connected to the blockchain through the event system by starting a
Expand All @@ -51,8 +52,8 @@ func SetEnableBloomFilter(enable bool) {
type Indexer struct {
backend bloomIndexer // Background processor generating the index data content

update chan struct{} // Notification channel that headers should be processed
quit chan struct{} // Quit channel to tear down running goroutines
update chan sdk.Context // Notification channel that headers should be processed
quit chan struct{} // Quit channel to tear down running goroutines

storedSections uint64 // Number of sections successfully indexed into the database
processing uint32 // Atomic flag whether indexer is processing or not
Expand All @@ -65,7 +66,7 @@ func InitIndexer(db dbm.DB) {

indexer = &Indexer{
backend: initBloomIndexer(db),
update: make(chan struct{}),
update: make(chan sdk.Context),
quit: make(chan struct{}),
}
indexer.setValidSections(indexer.GetValidSections())
Expand Down Expand Up @@ -93,14 +94,19 @@ func (i *Indexer) StoredSection() uint64 {
}

func (i *Indexer) IsProcessing() bool {
return i.processing == 1
return atomic.LoadUint32(&i.processing) == 1
}

func (i *Indexer) ProcessSection(ctx sdk.Context, k Keeper, interval uint64) {
if atomic.SwapUint32(&i.processing, 1) == 1 {
ctx.Logger().Error("matcher already running")
ctx.Logger().Error("matcher is already running")
return
}
defer func() {
if r := recover(); r != nil {
ctx.Logger().Error("ProcessSection panic height", ctx.BlockHeight(), r)
}
}()
defer atomic.StoreUint32(&i.processing, 0)
knownSection := interval / BloomBitsBlocks
for i.storedSections < knownSection {
Expand All @@ -126,6 +132,7 @@ func (i *Indexer) ProcessSection(ctx sdk.Context, k Keeper, interval uint64) {
bloom ethtypes.Bloom
hash common.Hash
)
ctx = i.updateCtx(ctx)
// the initial height is 1 but it on ethereum is 0. so set the bloom and hash of the block 0 to empty.
if number == uint64(tmtypes.GetStartBlockHeight()) {
bloom = ethtypes.Bloom{}
Expand Down Expand Up @@ -176,7 +183,7 @@ func (i *Indexer) setValidSections(sections uint64) {
i.storedSections = sections // needed if new > old
}

// loadValidSections reads the number of valid sections from the index database
// GetValidSections reads the number of valid sections from the index database
// and caches is into the local state.
func (i *Indexer) GetValidSections() uint64 {
data, _ := i.backend.db.Get([]byte("count"))
Expand All @@ -186,7 +193,7 @@ func (i *Indexer) GetValidSections() uint64 {
return 0
}

// SectionHead retrieves the last block hash of a processed section from the
// sectionHead retrieves the last block hash of a processed section from the
// index database.
func (i *Indexer) sectionHead(section uint64) common.Hash {
var data [8]byte
Expand Down Expand Up @@ -216,3 +223,24 @@ func (i *Indexer) removeSectionHead(section uint64) {

i.backend.db.Delete(append([]byte("shead"), data[:]...))
}

func (i *Indexer) NotifyNewHeight(ctx sdk.Context) {
i.update <- ctx
}

func (i *Indexer) updateCtx(oldCtx sdk.Context) sdk.Context {
newCtx := oldCtx
exit := false
for {
select {
case newCtx = <-i.update:
default:
exit = true
}
if exit {
break
}
}

return newCtx
}

0 comments on commit 7ad07fb

Please sign in to comment.