Skip to content

Commit

Permalink
Merge PR: put redis param into init (#1338)
Browse files Browse the repository at this point in the history
* put redis param into init

* del p2p logic

Co-authored-by: xiangjianmeng <[email protected]>
  • Loading branch information
chengzhinei and xiangjianmeng authored Dec 28, 2021
1 parent 2e7e741 commit 38b15c3
Show file tree
Hide file tree
Showing 10 changed files with 13 additions and 85 deletions.
2 changes: 0 additions & 2 deletions cmd/exchaind/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@ func replayCmd(ctx *server.Context) *cobra.Command {
cmd.Flags().BoolVarP(&state.IgnoreSmbCheck, "ignore-smb", "i", false, "ignore state machine broken")
cmd.Flags().Bool(types.FlagDownloadDDS, false, "get delta from dc/redis or not")
cmd.Flags().Bool(types.FlagUploadDDS, false, "send delta to dc/redis or not")
cmd.Flags().Bool(types.FlagApplyP2PDelta, false, "use delta from bcBlockResponseMessage or not")
cmd.Flags().Bool(types.FlagBroadcastP2PDelta, false, "save into deltastore.db, and add delta into bcBlockResponseMessage")
cmd.Flags().String(types.FlagRedisUrl, "localhost:6379", "redis url")
cmd.Flags().String(types.FlagRedisAuth, "", "redis auth")
cmd.Flags().Int(types.FlagRedisExpire, 300, "delta expiration time. unit is second")
Expand Down
2 changes: 0 additions & 2 deletions libs/cosmos-sdk/server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,6 @@ which accepts a path for the resulting pprof file.

cmd.Flags().Bool(tmtypes.FlagDownloadDDS, false, "get delta from dc/redis or not")
cmd.Flags().Bool(tmtypes.FlagUploadDDS, false, "send delta to dc/redis or not")
cmd.Flags().Bool(tmtypes.FlagApplyP2PDelta, false, "use delta from bcBlockResponseMessage or not")
cmd.Flags().Bool(tmtypes.FlagBroadcastP2PDelta, false, "save into deltastore.db, and add delta into bcBlockResponseMessage")
cmd.Flags().String(tmtypes.FlagRedisUrl, "localhost:6379", "redis url")
cmd.Flags().String(tmtypes.FlagRedisAuth, "", "redis auth")
cmd.Flags().Int(tmtypes.FlagRedisExpire, 300, "delta expiration time. unit is second")
Expand Down
2 changes: 1 addition & 1 deletion libs/cosmos-sdk/store/iavl/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (st *Store) GetImmutable(version int64) (*Store, error) {
// version and hash.
func (st *Store) Commit(inDelta *iavl.TreeDelta, deltas []byte) (types.CommitID, iavl.TreeDelta, []byte) {
flag := false
if (tmtypes.EnableApplyP2PDelta() || tmtypes.EnableDownloadDelta()) && len(deltas) != 0 {
if tmtypes.EnableDownloadDelta() && len(deltas) != 0 {
flag = true
st.tree.SetDelta(inDelta)
}
Expand Down
4 changes: 2 additions & 2 deletions libs/cosmos-sdk/store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ func commitStores(version int64, storeMap map[types.StoreKey]types.CommitKVStore
returnedDeltas := map[string]iavltree.TreeDelta{}

var err error
if (tmtypes.EnableApplyP2PDelta() || tmtypes.EnableDownloadDelta()) && len(deltas) != 0 {
if tmtypes.EnableDownloadDelta() && len(deltas) != 0 {
err = itjs.Unmarshal(deltas, &appliedDeltas)
if err != nil {
panic(err)
Expand All @@ -891,7 +891,7 @@ func commitStores(version int64, storeMap map[types.StoreKey]types.CommitKVStore
returnedDeltas[key.Name()] = reDelta
}

if tmtypes.EnableBroadcastP2PDelta() || tmtypes.EnableUploadDelta() {
if tmtypes.EnableUploadDelta() {
deltas, err = itjs.Marshal(returnedDeltas)
if err != nil {
panic(err)
Expand Down
4 changes: 0 additions & 4 deletions libs/tendermint/blockchain/v0/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,10 +584,6 @@ func (bpr *bpRequester) setBlock(block *types.Block, deltas *types.Deltas, peerI
}
bpr.block = block

if types.EnableApplyP2PDelta() {
bpr.deltas = deltas
}

bpr.mtx.Unlock()

select {
Expand Down
7 changes: 1 addition & 6 deletions libs/tendermint/blockchain/v0/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,8 @@ func (bcR *BlockchainReactor) respondToPeer(msg *bcBlockRequestMessage,
src p2p.Peer) (queued bool) {

block := bcR.store.LoadBlock(msg.Height)
var deltas *types.Deltas
if types.EnableBroadcastP2PDelta() {
deltas = bcR.dstore.LoadDeltas(msg.Height)
}

if block != nil {
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block, Deltas: deltas})
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block})
return src.TrySend(BlockchainChannel, msgBytes)
}

Expand Down
7 changes: 1 addition & 6 deletions libs/tendermint/blockchain/v1/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,8 @@ func (bcR *BlockchainReactor) sendBlockToPeer(msg *bcBlockRequestMessage,
src p2p.Peer) (queued bool) {

block := bcR.store.LoadBlock(msg.Height)
var deltas *types.Deltas
if types.EnableBroadcastP2PDelta() {
deltas = bcR.dstore.LoadDeltas(msg.Height)
}

if block != nil {
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block, Deltas: deltas})
msgBytes := cdc.MustMarshalBinaryBare(&bcBlockResponseMessage{Block: block})
return src.TrySend(BlockchainChannel, msgBytes)
}

Expand Down
6 changes: 1 addition & 5 deletions libs/tendermint/consensus/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,16 +666,12 @@ func (conR *Reactor) gossipDataForCatchup(logger log.Logger, rs *cstypes.RoundSt
time.Sleep(conR.conS.config.PeerGossipSleepDuration)
return
}
var deltas *types.Deltas
if types.EnableBroadcastP2PDelta() {
deltas = conR.conS.deltaStore.LoadDeltas(prs.Height)
}

// Send the part
msg := &BlockPartMessage{
Height: prs.Height, // Not our height, so it doesn't matter.
Round: prs.Round, // Not our height, so it doesn't matter.
Part: part,
Deltas: deltas,
}
logger.Debug("Sending block part for catchup", "round", prs.Round, "index", index)
if peer.Send(DataChannel, cdc.MustMarshalBinaryBare(msg)) {
Expand Down
7 changes: 5 additions & 2 deletions libs/tendermint/state/execution_dds.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,11 @@ func (dc *DeltaContext) init(l log.Logger) {
)

if dc.uploadDelta || dc.downloadDelta {
dc.deltaBroker = redis_cgi.NewRedisClient(types.RedisUrl(), types.RedisAuth(), types.RedisExpire(), l)
dc.logger.Info("Init delta broker", "url", types.RedisUrl())
url := viper.GetString(types.FlagRedisUrl)
auth := viper.GetString(types.FlagRedisAuth)
expire := time.Duration(viper.GetInt(types.FlagRedisExpire)) * time.Second
dc.deltaBroker = redis_cgi.NewRedisClient(url, auth, expire, l)
dc.logger.Info("Init delta broker", "url", url)
}

// control if iavl produce delta or not
Expand Down
57 changes: 2 additions & 55 deletions libs/tendermint/types/deltas.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,6 @@ import (
)

const (
// use delta from bcBlockResponseMessage or not
FlagApplyP2PDelta = "apply-p2p-delta"
// save into deltastore.db, and add delta into bcBlockResponseMessage
FlagBroadcastP2PDelta = "broadcast-delta"
// get delta from dc/redis
FlagDownloadDDS = "download-delta"
// send delta to dc/redis
Expand All @@ -21,8 +17,10 @@ const (
FlagDDSCompressFlag = "compress-flag"

// redis
// url fmt (ip:port)
FlagRedisUrl = "delta-redis-url"
FlagRedisAuth = "delta-redis-auth"
// expire unit: second
FlagRedisExpire = "delta-redis-expire"

// fast-query
Expand All @@ -35,26 +33,10 @@ const (

var (
fastQuery = false
// fmt (http://ip:port/)
centerUrl = "http://127.0.0.1:8030/"
// fmt (ip:port)
redisUrl = "127.0.0.1:6379"
redisAuth = "auth"
// unit: second
redisExpire = 300

applyP2PDelta = false
broadcatP2PDelta = false
downloadDelta = false
uploadDelta = false

onceFastQuery sync.Once
onceRedisUrl sync.Once
onceRedisAuth sync.Once
onceRedisExpire sync.Once

onceApplyP2P sync.Once
onceBroadcastP2P sync.Once
onceDownload sync.Once
onceUpload sync.Once
)
Expand All @@ -66,20 +48,6 @@ func IsFastQuery() bool {
return fastQuery
}

func EnableApplyP2PDelta() bool {
onceApplyP2P.Do(func() {
applyP2PDelta = viper.GetBool(FlagApplyP2PDelta)
})
return applyP2PDelta
}

func EnableBroadcastP2PDelta() bool {
onceBroadcastP2P.Do(func() {
broadcatP2PDelta = viper.GetBool(FlagBroadcastP2PDelta)
})
return broadcatP2PDelta
}

func EnableDownloadDelta() bool {
onceDownload.Do(func() {
downloadDelta = viper.GetBool(FlagDownloadDDS)
Expand All @@ -94,27 +62,6 @@ func EnableUploadDelta() bool {
return uploadDelta
}

func RedisUrl() string {
onceRedisUrl.Do(func() {
redisUrl = viper.GetString(FlagRedisUrl)
})
return redisUrl
}

func RedisAuth() string {
onceRedisAuth.Do(func() {
redisAuth = viper.GetString(FlagRedisAuth)
})
return redisAuth
}

func RedisExpire() time.Duration {
onceRedisExpire.Do(func() {
redisExpire = viper.GetInt(FlagRedisExpire)
})
return time.Duration(redisExpire) * time.Second
}

type DeltasMessage struct {
Metadata []byte `json:"metadata"`
Height int64 `json:"height"`
Expand Down

0 comments on commit 38b15c3

Please sign in to comment.