diff --git a/libs/tendermint/libs/clist/clist.go b/libs/tendermint/libs/clist/clist.go index 29b441d1c9..28bd217fb2 100644 --- a/libs/tendermint/libs/clist/clist.go +++ b/libs/tendermint/libs/clist/clist.go @@ -393,7 +393,7 @@ func (l *CList) PushBack(v interface{}) *CElement { func (l *CList) Remove(e *CElement) interface{} { l.mtx.Lock() if e.removed { - e.mtx.Unlock() + l.mtx.Unlock() return e.Value } prev := e.Prev() @@ -477,6 +477,8 @@ func (l *CList) InsertElement(ele *CElement) *CElement { if ele.Nonce < cur.Nonce { // small Nonce put ahead cur = cur.prev + } else if ele.Nonce == cur.Nonce { + panic(fmt.Sprintf("should not happen: insert same nonce transactions, ele=%+v,cur=%+v", ele, cur)) } else { // The tx of the same Nonce has been processed in checkElement, and there are only cases of big nonce // Big Nonce’s transaction, regardless of gasPrice, has to be in the back diff --git a/libs/tendermint/mempool/clist_mempool.go b/libs/tendermint/mempool/clist_mempool.go index e0973a8435..96fcd6aa8b 100644 --- a/libs/tendermint/mempool/clist_mempool.go +++ b/libs/tendermint/mempool/clist_mempool.go @@ -83,8 +83,8 @@ type CListMempool struct { metrics *Metrics - addressRecord *AddressRecord - checkRepeatedMtx sync.Mutex + addressRecord *AddressRecord + addAndSortMtx sync.Mutex pendingPool *PendingPool accountRetriever AccountRetriever @@ -412,6 +412,8 @@ func (mem *CListMempool) reqResCb( // Called from: // - resCbFirstTime (lock not held) if tx is valid func (mem *CListMempool) addAndSortTx(memTx *mempoolTx, info ExTxInfo) error { + mem.addAndSortMtx.Lock() + defer mem.addAndSortMtx.Unlock() // Delete the same Nonce transaction from the same account if res := mem.checkRepeatedElement(info); res == -1 { return errors.New(fmt.Sprintf("Failed to replace tx for acccount %s with nonce %d, "+ @@ -839,7 +841,6 @@ func (mem *CListMempool) Update( ele := e.(*clist.CElement) addr = ele.Address nonce = ele.Nonce - mem.removeTx(tx, ele, false) mem.logger.Debug("Mempool update", "address", ele.Address, "nonce", ele.Nonce) } else if mem.txInfoparser != nil { @@ -960,8 +961,6 @@ func (mem *CListMempool) reOrgTxs(addr string) *CListMempool { } func (mem *CListMempool) checkRepeatedElement(info ExTxInfo) int { - mem.checkRepeatedMtx.Lock() - defer mem.checkRepeatedMtx.Unlock() repeatElement := 0 if userMap, ok := mem.addressRecord.GetItem(info.Sender); ok { for _, node := range userMap { diff --git a/libs/tendermint/mempool/clist_mempool_test.go b/libs/tendermint/mempool/clist_mempool_test.go index 6a615fbea7..f816173a01 100644 --- a/libs/tendermint/mempool/clist_mempool_test.go +++ b/libs/tendermint/mempool/clist_mempool_test.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "strconv" + "sync" "testing" "time" @@ -901,3 +902,54 @@ func TestMultiPriceBump(t *testing.T) { require.True(t, tt.targetPrice.Cmp(MultiPriceBump(tt.rawPrice, int64(tt.priceBump))) == 0) } } + +func TestAddAndSortTxConcurrency(t *testing.T) { + app := kvstore.NewApplication() + cc := proxy.NewLocalClientCreator(app) + config := cfg.ResetTestRoot("mempool_test") + config.Mempool.SortTxByGp = true + mempool, cleanup := newMempoolWithAppAndConfig(cc, config) + defer cleanup() + + //tx := &mempoolTx{height: 1, gasWanted: 1, tx:[]byte{0x01}} + type Case struct { + Tx *mempoolTx + Info ExTxInfo + } + + testCases := []Case{ + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("1")}, ExTxInfo{"1", 0, big.NewInt(3780), 0}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("2")}, ExTxInfo{"1", 0, big.NewInt(3245), 1}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("3")}, ExTxInfo{"1", 0, big.NewInt(5315), 2}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("4")}, ExTxInfo{"1", 0, big.NewInt(4526), 3}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("5")}, ExTxInfo{"1", 0, big.NewInt(2140), 4}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("6")}, ExTxInfo{"1", 0, big.NewInt(4227), 5}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("7")}, ExTxInfo{"2", 0, big.NewInt(2161), 0}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("8")}, ExTxInfo{"2", 0, big.NewInt(5740), 1}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("9")}, ExTxInfo{"2", 0, big.NewInt(6574), 2}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("10")}, ExTxInfo{"2", 0, big.NewInt(9630), 3}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("11")}, ExTxInfo{"2", 0, big.NewInt(6554), 4}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("12")}, ExTxInfo{"2", 0, big.NewInt(5609), 2}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("13")}, ExTxInfo{"3", 0, big.NewInt(2791), 0}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("14")}, ExTxInfo{"3", 0, big.NewInt(2698), 1}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("15")}, ExTxInfo{"2", 0, big.NewInt(6925), 3}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("16")}, ExTxInfo{"1", 0, big.NewInt(4171), 3}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("17")}, ExTxInfo{"1", 0, big.NewInt(2965), 2}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("18")}, ExTxInfo{"3", 0, big.NewInt(2484), 2}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("19")}, ExTxInfo{"3", 0, big.NewInt(9722), 1}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("20")}, ExTxInfo{"2", 0, big.NewInt(4236), 3}}, + {&mempoolTx{height: 1, gasWanted: 1, tx: []byte("21")}, ExTxInfo{"1", 0, big.NewInt(8780), 4}}, + } + + var wait sync.WaitGroup + for _, exInfo := range testCases { + wait.Add(1) + go func(p Case) { + mempool.addAndSortTx(p.Tx, p.Info) + wait.Done() + }(exInfo) + } + + wait.Wait() +} +