From 25886c55577a3c0068138ba01ed179c0de538060 Mon Sep 17 00:00:00 2001 From: goshawk Date: Wed, 12 Jan 2022 18:40:21 +0200 Subject: [PATCH 1/6] Support both rate limiter and pendingPropagation in mempool --- pkg/core/mempool/mempool.go | 150 +++++++++++++++++++++++------------- 1 file changed, 96 insertions(+), 54 deletions(-) diff --git a/pkg/core/mempool/mempool.go b/pkg/core/mempool/mempool.go index a72ae4a78..48d20727a 100644 --- a/pkg/core/mempool/mempool.go +++ b/pkg/core/mempool/mempool.go @@ -17,6 +17,7 @@ import ( "time" "github.com/dusk-network/dusk-blockchain/pkg/util/diagnostics" + "golang.org/x/time/rate" "github.com/dusk-network/dusk-blockchain/pkg/config" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus" @@ -55,6 +56,8 @@ type Mempool struct { // verified txs to be included in next block. verified Pool + pendingPropagation chan TxDesc + // the collector to listen for new accepted blocks. acceptedBlockChan <-chan block.Block @@ -65,6 +68,8 @@ type Mempool struct { // the magic function that knows best what is valid chain Tx. verifier transactions.UnconfirmedTxProber + + limiter *rate.Limiter } // checkTx is responsible to determine if a tx is valid or not. @@ -103,6 +108,19 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra acceptedBlockChan, _ := consensus.InitAcceptedBlockUpdate(eventBus) + // Enable rate limiter from config + fromConfig := config.Get().Mempool.PropagateTimeout + + var limiter *rate.Limiter + if len(fromConfig) > 0 { + timeout, err := time.ParseDuration(config.Get().Mempool.PropagateTimeout) + if err != nil { + log.WithError(err).Fatal("could not parse mempool propagation timeout") + } + + limiter = rate.NewLimiter(rate.Every(timeout), 1) + } + m := &Mempool{ eventBus: eventBus, latestBlockTimestamp: math.MinInt32, @@ -111,6 +129,8 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra getMempoolTxsBySizeChan: getMempoolTxsBySizeChan, sendTxChan: sendTxChan, verifier: verifier, + limiter: limiter, + pendingPropagation: make(chan TxDesc, 1000), } // Setting the pool where to cache verified transactions. @@ -126,42 +146,80 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra return m } -// Run spawns the mempool lifecycle routine. The whole mempool cycle is around -// getting input from the outside world (from input channels) and provide the -// actual list of the verified txs (onto output channel). -// -// All operations are always executed in a single go-routine so no -// protection-by-mutex needed. +// Run spawns the mempool lifecycle routines. func (m *Mempool) Run(ctx context.Context) { - go func() { - ticker := time.NewTicker(idleTime) - defer ticker.Stop() - - for { - select { - // rpcbus methods. - case r := <-m.sendTxChan: - go handleRequest(r, m.processSendMempoolTxRequest, "SendTx") - case r := <-m.getMempoolTxsChan: - handleRequest(r, m.processGetMempoolTxsRequest, "GetMempoolTxs") - case r := <-m.getMempoolTxsBySizeChan: - handleRequest(r, m.processGetMempoolTxsBySizeRequest, "GetMempoolTxsBySize") - case b := <-m.acceptedBlockChan: - m.onBlock(b) - case <-ticker.C: - m.onIdle() - // Mempool terminating. - case <-ctx.Done(): - // m.eventBus.Unsubscribe(topics.Tx, m.txSubscriberID) - return + // Main Loop + go m.Loop(ctx) + + // Loop to drain pendingPropagation and try to propagate transaction + go m.propagateLoop(ctx) +} + +// Loop listens for GetMempoolTxs request and topics.AcceptedBlock events. +func (m *Mempool) Loop(ctx context.Context) { + ticker := time.NewTicker(idleTime) + defer ticker.Stop() + + for { + select { + // rpcbus methods. + case r := <-m.sendTxChan: + // TODO: This should be deleted once new wallet is integrated + go handleRequest(r, m.processSendMempoolTxRequest, "SendTx") + case r := <-m.getMempoolTxsChan: + handleRequest(r, m.processGetMempoolTxsRequest, "GetMempoolTxs") + case r := <-m.getMempoolTxsBySizeChan: + handleRequest(r, m.processGetMempoolTxsBySizeRequest, "GetMempoolTxsBySize") + case b := <-m.acceptedBlockChan: + m.onBlock(b) + case <-ticker.C: + m.onIdle() + case <-ctx.Done(): + // Mempool terminating + return + } + + ticker.Reset(idleTime) + } +} + +func (m *Mempool) propagateLoop(ctx context.Context) { + for { + select { + case t := <-m.pendingPropagation: + // Ensure we propagate at proper rate + if m.limiter != nil { + if err := m.limiter.Wait(ctx); err != nil { + log.WithError(err).Error("failed to limit rate") + } } - ticker.Reset(idleTime) + txid, err := t.tx.CalculateHash() + if err != nil { + log.WithError(err).Error("failed to calc hash") + continue + } + + if config.Get().Kadcast.Enabled { + // Broadcast ful transaction data in kadcast + err = m.kadcastTx(t) + } else { + // Advertise the transaction hash to gossip network via "Inventory Vectors" + err = m.advertiseTx(txid) + } + + if err != nil { + log.WithField("txid", hex.EncodeToString(txid)).WithError(err).Error("failed to propagate") + } + + // Mempool terminating + case <-ctx.Done(): + return } - }() + } } -// ProcessTx handles a submitted tx from any source (rpcBus or eventBus). +// ProcessTx processes a Transaction wire message. func (m *Mempool) ProcessTx(srcPeerID string, msg message.Message) ([]bytes.Buffer, error) { maxSizeBytes := config.Get().Mempool.MaxSizeMB * 1000 * 1000 if m.verified.Size() > maxSizeBytes { @@ -176,7 +234,10 @@ func (m *Mempool) ProcessTx(srcPeerID string, msg message.Message) ([]bytes.Buff h = msg.Header()[0] } - t := TxDesc{tx: msg.Payload().(transactions.ContractCall), received: time.Now(), size: uint(len(msg.Id())), kadHeight: h} + t := TxDesc{tx: msg.Payload().(transactions.ContractCall), + received: time.Now(), + size: uint(len(msg.Id())), + kadHeight: h} start := time.Now() txid, err := m.processTx(t) @@ -234,33 +295,14 @@ func (m *Mempool) processTx(t TxDesc) ([]byte, error) { return txid, fmt.Errorf("store err - %v", err) } - // try to (re)propagate transaction in both gossip and kadcast networks - m.propagateTx(t, txid) + // queue transaction for (re)propagation + go func() { + m.pendingPropagation <- t + }() return txid, nil } -// propagateTx (re)-propagate tx in gossip or kadcast network but not in both. -func (m *Mempool) propagateTx(t TxDesc, txid []byte) { - if config.Get().Kadcast.Enabled { - // Kadcast complete transaction data - if err := m.kadcastTx(t); err != nil { - log. - WithError(err). - WithField("txid", txid). - Error("kadcast propagation failed") - } - } else { - // Advertise the transaction hash to gossip network via "Inventory Vectors" - if err := m.advertiseTx(txid); err != nil { - log. - WithError(err). - WithField("txid", txid). - Error("gossip propagation failed") - } - } -} - func (m *Mempool) onBlock(b block.Block) { m.latestBlockTimestamp = b.Header.Timestamp m.removeAccepted(b) From f6665618d57976fe6f33225db46a0484b26f545a Mon Sep 17 00:00:00 2001 From: goshawk Date: Wed, 12 Jan 2022 18:42:33 +0200 Subject: [PATCH 2/6] Add mempool.propagateTimeout config --- pkg/config/groups.go | 9 +++++---- pkg/config/samples/default.dusk.toml | 2 ++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/config/groups.go b/pkg/config/groups.go index 1d2af96a5..45291abf5 100644 --- a/pkg/config/groups.go +++ b/pkg/config/groups.go @@ -169,10 +169,11 @@ type performanceConfiguration struct { } type mempoolConfiguration struct { - MaxSizeMB uint32 - PoolType string - PreallocTxs uint32 - MaxInvItems uint32 + MaxSizeMB uint32 + PoolType string + PreallocTxs uint32 + MaxInvItems uint32 + PropagateTimeout string } type consensusConfiguration struct { diff --git a/pkg/config/samples/default.dusk.toml b/pkg/config/samples/default.dusk.toml index 667a15dbd..6a43bd6bd 100644 --- a/pkg/config/samples/default.dusk.toml +++ b/pkg/config/samples/default.dusk.toml @@ -119,6 +119,8 @@ preallocTxs = 100 # Max number of items to respond with on topics.Mempool request # To disable topics.Mempool handling, set it to 0 maxInvItems = 10000 +# Back pressure timeout on transaction propagation +propagateTimeout = "100ms" # gRPC API service [rpc] From f3fd2ae08ea26c4e9d5cb3303ec1e89ab8f98804 Mon Sep 17 00:00:00 2001 From: goshawk Date: Wed, 12 Jan 2022 19:20:00 +0200 Subject: [PATCH 3/6] Add default propagate timeout in default test-harness profile --- harness/engine/profiles.go | 1 + 1 file changed, 1 insertion(+) diff --git a/harness/engine/profiles.go b/harness/engine/profiles.go index c412240c8..5cbe03b6c 100644 --- a/harness/engine/profiles.go +++ b/harness/engine/profiles.go @@ -109,6 +109,7 @@ func Profile1(index int, node *DuskNode, walletPath string) { viper.Set("mempool.poolType", "hashmap") viper.Set("mempool.preallocTxs", "100") viper.Set("mempool.maxInvItems", "10000") + viper.Set("mempool.propagateTimeout", "100ms") viper.Set("consensus.defaultlocktime", 1000) viper.Set("consensus.defaultoffset", 10) From fa341e7f511115d2540ad33929ba7c326b6c34bf Mon Sep 17 00:00:00 2001 From: goshawk Date: Thu, 13 Jan 2022 07:56:42 +0200 Subject: [PATCH 4/6] Adjust TestMeasureNetworkTPS for more accurate measurements --- harness/engine/utils.go | 4 +-- harness/tests/localnet_test.go | 46 ++++++++++++++++++---------------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/harness/engine/utils.go b/harness/engine/utils.go index 3e670eaef..2a563cd9b 100644 --- a/harness/engine/utils.go +++ b/harness/engine/utils.go @@ -569,8 +569,8 @@ func (n *Network) BatchSendTransferTx(t *testing.T, senderNodeInd uint, batchSiz for i := uint(0); i < batchSize; i++ { req := pb.TransferRequest{Amount: amount, Address: pubKey, Fee: fee} - clientDeadline := time.Now().Add(timeout) - ctx, cancel := context.WithDeadline(context.Background(), clientDeadline) + //clientDeadline := time.Now().Add(timeout) + ctx, cancel := context.WithTimeout(context.Background(), timeout) _, err := client.Transfer(ctx, &req) if err != nil { diff --git a/harness/tests/localnet_test.go b/harness/tests/localnet_test.go index 161ca5420..90e3ff455 100644 --- a/harness/tests/localnet_test.go +++ b/harness/tests/localnet_test.go @@ -305,31 +305,33 @@ func TestMeasureNetworkTPS(t *testing.T) { } // Wait until first two blocks are accepted - localNet.WaitUntil(t, 0, 2, 2*time.Minute, 5*time.Second) + go func() { + localNet.WaitUntil(t, 0, 30, 5*time.Minute, 5*time.Second) - // Transaction Load test. - // Each of the nodes in the network starts sending transfer txs up to batchSize - batchSizeEnv, _ := os.LookupEnv("DUSK_TX_BATCH_SIZE") - batchSize, _ := strconv.Atoi(batchSizeEnv) + // Transaction Load test. + // Each of the nodes in the network starts sending transfer txs up to batchSize + batchSizeEnv, _ := os.LookupEnv("DUSK_TX_BATCH_SIZE") + batchSize, _ := strconv.Atoi(batchSizeEnv) - if batchSize == 0 { - batchSize = 300 - } + if batchSize == 0 { + batchSize = 300 + } - for i := uint(0); i < uint(localNet.Size()); i++ { - logrus.WithField("node_id", i).WithField("batch_size", batchSize). - Info("start sending transfer txs ...") - - // Start concurrently flooding the network with batch of Transfer transactions - go func(ind uint) { - if err := localNet.BatchSendTransferTx(t, ind, uint(batchSize), 100, 10, time.Minute); err != nil { - logrus.Error(err) - } - - logrus.WithField("node_id", ind).WithField("batch_size", batchSize). - Info("batch of transfer txs completed") - }(i) - } + for i := uint(0); i < uint(localNet.Size()); i++ { + logrus.WithField("node_id", i).WithField("batch_size", batchSize). + Info("start sending transfer txs ...") + + // Start concurrently flooding the network with batch of Transfer transactions + go func(ind uint) { + if err := localNet.BatchSendTransferTx(t, ind, uint(batchSize), 100, 10, 10*time.Minute); err != nil { + logrus.Error(err) + } + + logrus.WithField("node_id", ind).WithField("batch_size", batchSize). + Info("batch of transfer txs completed") + }(i) + } + }() // Start monitoring TPS metric of the network localNet.MonitorTPS(4 * time.Second) From ab6ad48553bf2d176244351e8cc9c8825777a1a2 Mon Sep 17 00:00:00 2001 From: goshawk Date: Thu, 13 Jan 2022 09:43:16 +0200 Subject: [PATCH 5/6] Trace propagate_timeout in mempool --- harness/engine/utils.go | 1 - pkg/core/mempool/mempool.go | 18 ++++++++++++++---- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/harness/engine/utils.go b/harness/engine/utils.go index 2a563cd9b..ad64cfaed 100644 --- a/harness/engine/utils.go +++ b/harness/engine/utils.go @@ -569,7 +569,6 @@ func (n *Network) BatchSendTransferTx(t *testing.T, senderNodeInd uint, batchSiz for i := uint(0); i < batchSize; i++ { req := pb.TransferRequest{Amount: amount, Address: pubKey, Fee: fee} - //clientDeadline := time.Now().Add(timeout) ctx, cancel := context.WithTimeout(context.Background(), timeout) _, err := client.Transfer(ctx, &req) diff --git a/pkg/core/mempool/mempool.go b/pkg/core/mempool/mempool.go index 48d20727a..1882e6c25 100644 --- a/pkg/core/mempool/mempool.go +++ b/pkg/core/mempool/mempool.go @@ -112,6 +112,7 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra fromConfig := config.Get().Mempool.PropagateTimeout var limiter *rate.Limiter + if len(fromConfig) > 0 { timeout, err := time.ParseDuration(config.Get().Mempool.PropagateTimeout) if err != nil { @@ -137,7 +138,14 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra // The pool is normally a Hashmap m.verified = m.newPool() - log.WithField("type", config.Get().Mempool.PoolType).Info("running") + l := log.WithField("backend_type", config.Get().Mempool.PoolType). + WithField("max_size_mb", config.Get().Mempool.MaxSizeMB) + + if len(fromConfig) > 0 { + l = l.WithField("propagate_timeout", fromConfig) + } + + l.Info("running") if srv != nil { node.RegisterMempoolServer(srv, m) @@ -164,7 +172,7 @@ func (m *Mempool) Loop(ctx context.Context) { select { // rpcbus methods. case r := <-m.sendTxChan: - // TODO: This should be deleted once new wallet is integrated + // TODO: This handler should be deleted once new wallet is integrated go handleRequest(r, m.processSendMempoolTxRequest, "SendTx") case r := <-m.getMempoolTxsChan: handleRequest(r, m.processGetMempoolTxsRequest, "GetMempoolTxs") @@ -234,10 +242,12 @@ func (m *Mempool) ProcessTx(srcPeerID string, msg message.Message) ([]bytes.Buff h = msg.Header()[0] } - t := TxDesc{tx: msg.Payload().(transactions.ContractCall), + t := TxDesc{ + tx: msg.Payload().(transactions.ContractCall), received: time.Now(), size: uint(len(msg.Id())), - kadHeight: h} + kadHeight: h, + } start := time.Now() txid, err := m.processTx(t) From 9ab9f2f1889938cd7cfa8c76c1e4196eedaae795 Mon Sep 17 00:00:00 2001 From: goshawk Date: Thu, 13 Jan 2022 14:00:30 +0200 Subject: [PATCH 6/6] Introduce mempool.propagateBurst config --- harness/engine/profiles.go | 1 + pkg/config/groups.go | 1 + pkg/config/samples/default.dusk.toml | 3 ++- pkg/core/mempool/mempool.go | 26 +++++++++++++++----------- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/harness/engine/profiles.go b/harness/engine/profiles.go index 5cbe03b6c..e5c3512b8 100644 --- a/harness/engine/profiles.go +++ b/harness/engine/profiles.go @@ -110,6 +110,7 @@ func Profile1(index int, node *DuskNode, walletPath string) { viper.Set("mempool.preallocTxs", "100") viper.Set("mempool.maxInvItems", "10000") viper.Set("mempool.propagateTimeout", "100ms") + viper.Set("mempool.propagateBurst", 1) viper.Set("consensus.defaultlocktime", 1000) viper.Set("consensus.defaultoffset", 10) diff --git a/pkg/config/groups.go b/pkg/config/groups.go index 45291abf5..449efd3b7 100644 --- a/pkg/config/groups.go +++ b/pkg/config/groups.go @@ -174,6 +174,7 @@ type mempoolConfiguration struct { PreallocTxs uint32 MaxInvItems uint32 PropagateTimeout string + PropagateBurst uint32 } type consensusConfiguration struct { diff --git a/pkg/config/samples/default.dusk.toml b/pkg/config/samples/default.dusk.toml index 6a43bd6bd..c236e55fc 100644 --- a/pkg/config/samples/default.dusk.toml +++ b/pkg/config/samples/default.dusk.toml @@ -119,8 +119,9 @@ preallocTxs = 100 # Max number of items to respond with on topics.Mempool request # To disable topics.Mempool handling, set it to 0 maxInvItems = 10000 -# Back pressure timeout on transaction propagation +# Back pressure on transaction propagation propagateTimeout = "100ms" +propagateBurst = 1 # gRPC API service [rpc] diff --git a/pkg/core/mempool/mempool.go b/pkg/core/mempool/mempool.go index 1882e6c25..5bf4617aa 100644 --- a/pkg/core/mempool/mempool.go +++ b/pkg/core/mempool/mempool.go @@ -91,6 +91,9 @@ func (m *Mempool) checkTx(tx transactions.ContractCall) error { func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier transactions.UnconfirmedTxProber, srv *grpc.Server) *Mempool { log.Infof("create instance") + l := log.WithField("backend_type", config.Get().Mempool.PoolType). + WithField("max_size_mb", config.Get().Mempool.MaxSizeMB) + getMempoolTxsChan := make(chan rpcbus.Request, 1) if err := rpcBus.Register(topics.GetMempoolTxs, getMempoolTxsChan); err != nil { log.WithError(err).Error("failed to register topics.GetMempoolTxs") @@ -109,17 +112,25 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra acceptedBlockChan, _ := consensus.InitAcceptedBlockUpdate(eventBus) // Enable rate limiter from config - fromConfig := config.Get().Mempool.PropagateTimeout + cfg := config.Get().Mempool var limiter *rate.Limiter - if len(fromConfig) > 0 { - timeout, err := time.ParseDuration(config.Get().Mempool.PropagateTimeout) + if len(cfg.PropagateTimeout) > 0 { + timeout, err := time.ParseDuration(cfg.PropagateTimeout) if err != nil { log.WithError(err).Fatal("could not parse mempool propagation timeout") } - limiter = rate.NewLimiter(rate.Every(timeout), 1) + burst := cfg.PropagateBurst + if burst == 0 { + burst = 1 + } + + limiter = rate.NewLimiter(rate.Every(timeout), int(burst)) + + l = l.WithField("propagate_timeout", cfg.PropagateTimeout). + WithField("propagate_burst", burst) } m := &Mempool{ @@ -138,13 +149,6 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra // The pool is normally a Hashmap m.verified = m.newPool() - l := log.WithField("backend_type", config.Get().Mempool.PoolType). - WithField("max_size_mb", config.Get().Mempool.MaxSizeMB) - - if len(fromConfig) > 0 { - l = l.WithField("propagate_timeout", fromConfig) - } - l.Info("running") if srv != nil {