From 2f420faec2baa93e1b498e324c5013c755310638 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Mon, 17 Jun 2024 14:36:38 -0400 Subject: [PATCH 1/4] Mempool async processing --- mempool/reactor.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/mempool/reactor.go b/mempool/reactor.go index 5919b8cadd..be69db53dc 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -24,6 +24,8 @@ type Reactor struct { mempool *CListMempool ids *mempoolIDs + peerTxProcesserChan chan *peerIncomingTx + // Semaphores to keep track of how many connections to peers are active for broadcasting // transactions. Each semaphore has a capacity that puts an upper bound on the number of // connections for different groups of peers. @@ -41,6 +43,7 @@ func NewReactor(config *cfg.MempoolConfig, mempool *CListMempool) *Reactor { memR.BaseReactor = *p2p.NewBaseReactor("Mempool", memR) memR.activePersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToPersistentPeers)) memR.activeNonPersistentPeersSemaphore = semaphore.NewWeighted(int64(memR.config.ExperimentalMaxGossipConnectionsToNonPersistentPeers)) + memR.peerTxProcesserChan = make(chan *peerIncomingTx, 10000) return memR } @@ -62,9 +65,15 @@ func (memR *Reactor) OnStart() error { if !memR.config.Broadcast { memR.Logger.Info("Tx broadcasting is disabled") } + + go memR.incomingPacketProcessor() return nil } +func (memR *Reactor) OnStop() { + close(memR.peerTxProcesserChan) +} + // GetChannels implements Reactor by returning the list of channels for this // reactor. func (memR *Reactor) GetChannels() []*p2p.ChannelDescriptor { @@ -134,6 +143,11 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, _ interface{}) { // broadcast routine checks if peer is gone and returns } +type peerIncomingTx struct { + tx *protomem.Txs + peer p2p.Peer +} + // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) Receive(e p2p.Envelope) { @@ -169,6 +183,36 @@ func (memR *Reactor) Receive(e p2p.Envelope) { // broadcasting happens from go routines per peer } +func (memR *Reactor) incomingPacketProcessor() { + for { + pit, chanOpen := <-memR.peerTxProcesserChan + if !chanOpen { + break + } + + protoTxs := pit.tx.GetTxs() + if len(protoTxs) == 0 { + memR.Logger.Error("received empty txs from peer", "src", pit.peer) + continue + } + txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(pit.peer)} + if pit.peer != nil { + txInfo.SenderP2PID = pit.peer.ID() + } + + var err error + for _, tx := range protoTxs { + ntx := types.Tx(tx) + err = memR.mempool.CheckTx(ntx, nil, txInfo) + if errors.Is(err, mempool.ErrTxInCache) { + memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String()) + } else if err != nil { + memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err) + } + } + } +} + // PeerState describes the state of a peer. type PeerState interface { GetHeight() int64 From ed4a769be09384d3943ab1e59ac52a302de6c5c1 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Mon, 17 Jun 2024 15:30:56 -0400 Subject: [PATCH 2/4] Forgot to commit important part --- mempool/reactor.go | 31 +++++++------------------------ 1 file changed, 7 insertions(+), 24 deletions(-) diff --git a/mempool/reactor.go b/mempool/reactor.go index be69db53dc..27be80104b 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -154,33 +154,16 @@ func (memR *Reactor) Receive(e p2p.Envelope) { memR.Logger.Debug("Receive", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) switch msg := e.Message.(type) { case *protomem.Txs: - protoTxs := msg.GetTxs() - if len(protoTxs) == 0 { - memR.Logger.Error("received empty txs from peer", "src", e.Src) - return - } - txInfo := TxInfo{SenderID: memR.ids.GetForPeer(e.Src)} - if e.Src != nil { - txInfo.SenderP2PID = e.Src.ID() - } - - var err error - for _, tx := range protoTxs { - ntx := types.Tx(tx) - err = memR.mempool.CheckTx(ntx, nil, txInfo) - if errors.Is(err, ErrTxInCache) { - // memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String()) - } else if err != nil { - memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err) - } + pit := &peerIncomingTx{ + tx: msg, + peer: e.Src, } + memR.peerTxProcesserChan <- pit default: memR.Logger.Error("unknown message type", "src", e.Src, "chId", e.ChannelID, "msg", e.Message) memR.Switch.StopPeerForError(e.Src, fmt.Errorf("mempool cannot handle message of type: %T", e.Message)) return } - - // broadcasting happens from go routines per peer } func (memR *Reactor) incomingPacketProcessor() { @@ -195,7 +178,7 @@ func (memR *Reactor) incomingPacketProcessor() { memR.Logger.Error("received empty txs from peer", "src", pit.peer) continue } - txInfo := mempool.TxInfo{SenderID: memR.ids.GetForPeer(pit.peer)} + txInfo := TxInfo{SenderID: memR.ids.GetForPeer(pit.peer)} if pit.peer != nil { txInfo.SenderP2PID = pit.peer.ID() } @@ -204,8 +187,8 @@ func (memR *Reactor) incomingPacketProcessor() { for _, tx := range protoTxs { ntx := types.Tx(tx) err = memR.mempool.CheckTx(ntx, nil, txInfo) - if errors.Is(err, mempool.ErrTxInCache) { - memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String()) + if errors.Is(err, ErrTxInCache) { + // memR.Logger.Debug("Tx already exists in cache", "tx", ntx.String()) } else if err != nil { memR.Logger.Info("Could not check tx", "tx", ntx.String(), "err", err) } From 3b268d2c7e6cbc068b81e90fa56379d343a6c005 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Mon, 19 Aug 2024 16:23:06 -0400 Subject: [PATCH 3/4] Add changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e648c749d..c3b0882420 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ It also includes a few other bug fixes and performance improvements. point to their enclosing for loop label to exit ([\#3544](https://github.com/cometbft/cometbft/issues/3544)) - [#91](https://github.com/osmosis-labs/cometbft/pull/91) perf(consensus): Minor improvement by making add vote only do one peer set mutex call, not 3 (#3156) +* [#109](https://github.com/osmosis-labs/cometbft/pull/109) perf(p2p,mempool): Make mempool reactor receive not block. (Fixed by either #3209, #3230) + ## v0.38.10 From 201fd7d1134bf1e8a4e97f5e44e15ada001e530d Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Mon, 19 Aug 2024 16:27:37 -0400 Subject: [PATCH 4/4] Fix race --- mempool/reactor.go | 1 - 1 file changed, 1 deletion(-) diff --git a/mempool/reactor.go b/mempool/reactor.go index 27be80104b..3f40181bc3 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -71,7 +71,6 @@ func (memR *Reactor) OnStart() error { } func (memR *Reactor) OnStop() { - close(memR.peerTxProcesserChan) } // GetChannels implements Reactor by returning the list of channels for this