Skip to content

Commit

Permalink
Merge PR :fix mempool mutex (#1244)
Browse files Browse the repository at this point in the history
* fix mempool mutex

* fix mempool addAndSortTx

* format code

Co-authored-by: KamiD <[email protected]>
  • Loading branch information
ilovers and KamiD authored Dec 2, 2021
1 parent 95dcd63 commit fb9021b
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 6 deletions.
4 changes: 3 additions & 1 deletion libs/tendermint/libs/clist/clist.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func (l *CList) PushBack(v interface{}) *CElement {
func (l *CList) Remove(e *CElement) interface{} {
l.mtx.Lock()
if e.removed {
e.mtx.Unlock()
l.mtx.Unlock()
return e.Value
}
prev := e.Prev()
Expand Down Expand Up @@ -477,6 +477,8 @@ func (l *CList) InsertElement(ele *CElement) *CElement {
if ele.Nonce < cur.Nonce {
// small Nonce put ahead
cur = cur.prev
} else if ele.Nonce == cur.Nonce {
panic(fmt.Sprintf("should not happen: insert same nonce transactions, ele=%+v,cur=%+v", ele, cur))
} else {
// The tx of the same Nonce has been processed in checkElement, and there are only cases of big nonce
// Big Nonce’s transaction, regardless of gasPrice, has to be in the back
Expand Down
9 changes: 4 additions & 5 deletions libs/tendermint/mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ type CListMempool struct {

metrics *Metrics

addressRecord *AddressRecord
checkRepeatedMtx sync.Mutex
addressRecord *AddressRecord
addAndSortMtx sync.Mutex

pendingPool *PendingPool
accountRetriever AccountRetriever
Expand Down Expand Up @@ -412,6 +412,8 @@ func (mem *CListMempool) reqResCb(
// Called from:
// - resCbFirstTime (lock not held) if tx is valid
func (mem *CListMempool) addAndSortTx(memTx *mempoolTx, info ExTxInfo) error {
mem.addAndSortMtx.Lock()
defer mem.addAndSortMtx.Unlock()
// Delete the same Nonce transaction from the same account
if res := mem.checkRepeatedElement(info); res == -1 {
return errors.New(fmt.Sprintf("Failed to replace tx for acccount %s with nonce %d, "+
Expand Down Expand Up @@ -839,7 +841,6 @@ func (mem *CListMempool) Update(
ele := e.(*clist.CElement)
addr = ele.Address
nonce = ele.Nonce

mem.removeTx(tx, ele, false)
mem.logger.Debug("Mempool update", "address", ele.Address, "nonce", ele.Nonce)
} else if mem.txInfoparser != nil {
Expand Down Expand Up @@ -960,8 +961,6 @@ func (mem *CListMempool) reOrgTxs(addr string) *CListMempool {
}

func (mem *CListMempool) checkRepeatedElement(info ExTxInfo) int {
mem.checkRepeatedMtx.Lock()
defer mem.checkRepeatedMtx.Unlock()
repeatElement := 0
if userMap, ok := mem.addressRecord.GetItem(info.Sender); ok {
for _, node := range userMap {
Expand Down
52 changes: 52 additions & 0 deletions libs/tendermint/mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -901,3 +902,54 @@ func TestMultiPriceBump(t *testing.T) {
require.True(t, tt.targetPrice.Cmp(MultiPriceBump(tt.rawPrice, int64(tt.priceBump))) == 0)
}
}

func TestAddAndSortTxConcurrency(t *testing.T) {
app := kvstore.NewApplication()
cc := proxy.NewLocalClientCreator(app)
config := cfg.ResetTestRoot("mempool_test")
config.Mempool.SortTxByGp = true
mempool, cleanup := newMempoolWithAppAndConfig(cc, config)
defer cleanup()

//tx := &mempoolTx{height: 1, gasWanted: 1, tx:[]byte{0x01}}
type Case struct {
Tx *mempoolTx
Info ExTxInfo
}

testCases := []Case{
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("1")}, ExTxInfo{"1", 0, big.NewInt(3780), 0}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("2")}, ExTxInfo{"1", 0, big.NewInt(3245), 1}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("3")}, ExTxInfo{"1", 0, big.NewInt(5315), 2}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("4")}, ExTxInfo{"1", 0, big.NewInt(4526), 3}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("5")}, ExTxInfo{"1", 0, big.NewInt(2140), 4}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("6")}, ExTxInfo{"1", 0, big.NewInt(4227), 5}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("7")}, ExTxInfo{"2", 0, big.NewInt(2161), 0}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("8")}, ExTxInfo{"2", 0, big.NewInt(5740), 1}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("9")}, ExTxInfo{"2", 0, big.NewInt(6574), 2}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("10")}, ExTxInfo{"2", 0, big.NewInt(9630), 3}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("11")}, ExTxInfo{"2", 0, big.NewInt(6554), 4}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("12")}, ExTxInfo{"2", 0, big.NewInt(5609), 2}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("13")}, ExTxInfo{"3", 0, big.NewInt(2791), 0}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("14")}, ExTxInfo{"3", 0, big.NewInt(2698), 1}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("15")}, ExTxInfo{"2", 0, big.NewInt(6925), 3}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("16")}, ExTxInfo{"1", 0, big.NewInt(4171), 3}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("17")}, ExTxInfo{"1", 0, big.NewInt(2965), 2}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("18")}, ExTxInfo{"3", 0, big.NewInt(2484), 2}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("19")}, ExTxInfo{"3", 0, big.NewInt(9722), 1}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("20")}, ExTxInfo{"2", 0, big.NewInt(4236), 3}},
{&mempoolTx{height: 1, gasWanted: 1, tx: []byte("21")}, ExTxInfo{"1", 0, big.NewInt(8780), 4}},
}

var wait sync.WaitGroup
for _, exInfo := range testCases {
wait.Add(1)
go func(p Case) {
mempool.addAndSortTx(p.Tx, p.Info)
wait.Done()
}(exInfo)
}

wait.Wait()
}

0 comments on commit fb9021b

Please sign in to comment.