diff --git a/harness/engine/profiles.go b/harness/engine/profiles.go index c412240c8..e5c3512b8 100644 --- a/harness/engine/profiles.go +++ b/harness/engine/profiles.go @@ -109,6 +109,8 @@ func Profile1(index int, node *DuskNode, walletPath string) { viper.Set("mempool.poolType", "hashmap") viper.Set("mempool.preallocTxs", "100") viper.Set("mempool.maxInvItems", "10000") + viper.Set("mempool.propagateTimeout", "100ms") + viper.Set("mempool.propagateBurst", 1) viper.Set("consensus.defaultlocktime", 1000) viper.Set("consensus.defaultoffset", 10) diff --git a/harness/engine/utils.go b/harness/engine/utils.go index 3e670eaef..ad64cfaed 100644 --- a/harness/engine/utils.go +++ b/harness/engine/utils.go @@ -569,8 +569,7 @@ func (n *Network) BatchSendTransferTx(t *testing.T, senderNodeInd uint, batchSiz for i := uint(0); i < batchSize; i++ { req := pb.TransferRequest{Amount: amount, Address: pubKey, Fee: fee} - clientDeadline := time.Now().Add(timeout) - ctx, cancel := context.WithDeadline(context.Background(), clientDeadline) + ctx, cancel := context.WithTimeout(context.Background(), timeout) _, err := client.Transfer(ctx, &req) if err != nil { diff --git a/harness/tests/localnet_test.go b/harness/tests/localnet_test.go index 161ca5420..90e3ff455 100644 --- a/harness/tests/localnet_test.go +++ b/harness/tests/localnet_test.go @@ -305,31 +305,33 @@ func TestMeasureNetworkTPS(t *testing.T) { } // Wait until first two blocks are accepted - localNet.WaitUntil(t, 0, 2, 2*time.Minute, 5*time.Second) + go func() { + localNet.WaitUntil(t, 0, 30, 5*time.Minute, 5*time.Second) - // Transaction Load test. - // Each of the nodes in the network starts sending transfer txs up to batchSize - batchSizeEnv, _ := os.LookupEnv("DUSK_TX_BATCH_SIZE") - batchSize, _ := strconv.Atoi(batchSizeEnv) + // Transaction Load test. + // Each of the nodes in the network starts sending transfer txs up to batchSize + batchSizeEnv, _ := os.LookupEnv("DUSK_TX_BATCH_SIZE") + batchSize, _ := strconv.Atoi(batchSizeEnv) - if batchSize == 0 { - batchSize = 300 - } + if batchSize == 0 { + batchSize = 300 + } - for i := uint(0); i < uint(localNet.Size()); i++ { - logrus.WithField("node_id", i).WithField("batch_size", batchSize). - Info("start sending transfer txs ...") - - // Start concurrently flooding the network with batch of Transfer transactions - go func(ind uint) { - if err := localNet.BatchSendTransferTx(t, ind, uint(batchSize), 100, 10, time.Minute); err != nil { - logrus.Error(err) - } - - logrus.WithField("node_id", ind).WithField("batch_size", batchSize). - Info("batch of transfer txs completed") - }(i) - } + for i := uint(0); i < uint(localNet.Size()); i++ { + logrus.WithField("node_id", i).WithField("batch_size", batchSize). + Info("start sending transfer txs ...") + + // Start concurrently flooding the network with batch of Transfer transactions + go func(ind uint) { + if err := localNet.BatchSendTransferTx(t, ind, uint(batchSize), 100, 10, 10*time.Minute); err != nil { + logrus.Error(err) + } + + logrus.WithField("node_id", ind).WithField("batch_size", batchSize). + Info("batch of transfer txs completed") + }(i) + } + }() // Start monitoring TPS metric of the network localNet.MonitorTPS(4 * time.Second) diff --git a/pkg/config/groups.go b/pkg/config/groups.go index 1d2af96a5..449efd3b7 100644 --- a/pkg/config/groups.go +++ b/pkg/config/groups.go @@ -169,10 +169,12 @@ type performanceConfiguration struct { } type mempoolConfiguration struct { - MaxSizeMB uint32 - PoolType string - PreallocTxs uint32 - MaxInvItems uint32 + MaxSizeMB uint32 + PoolType string + PreallocTxs uint32 + MaxInvItems uint32 + PropagateTimeout string + PropagateBurst uint32 } type consensusConfiguration struct { diff --git a/pkg/config/samples/default.dusk.toml b/pkg/config/samples/default.dusk.toml index 667a15dbd..c236e55fc 100644 --- a/pkg/config/samples/default.dusk.toml +++ b/pkg/config/samples/default.dusk.toml @@ -119,6 +119,9 @@ preallocTxs = 100 # Max number of items to respond with on topics.Mempool request # To disable topics.Mempool handling, set it to 0 maxInvItems = 10000 +# Back pressure on transaction propagation +propagateTimeout = "100ms" +propagateBurst = 1 # gRPC API service [rpc] diff --git a/pkg/core/mempool/mempool.go b/pkg/core/mempool/mempool.go index a72ae4a78..5bf4617aa 100644 --- a/pkg/core/mempool/mempool.go +++ b/pkg/core/mempool/mempool.go @@ -17,6 +17,7 @@ import ( "time" "github.com/dusk-network/dusk-blockchain/pkg/util/diagnostics" + "golang.org/x/time/rate" "github.com/dusk-network/dusk-blockchain/pkg/config" "github.com/dusk-network/dusk-blockchain/pkg/core/consensus" @@ -55,6 +56,8 @@ type Mempool struct { // verified txs to be included in next block. verified Pool + pendingPropagation chan TxDesc + // the collector to listen for new accepted blocks. acceptedBlockChan <-chan block.Block @@ -65,6 +68,8 @@ type Mempool struct { // the magic function that knows best what is valid chain Tx. verifier transactions.UnconfirmedTxProber + + limiter *rate.Limiter } // checkTx is responsible to determine if a tx is valid or not. @@ -86,6 +91,9 @@ func (m *Mempool) checkTx(tx transactions.ContractCall) error { func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier transactions.UnconfirmedTxProber, srv *grpc.Server) *Mempool { log.Infof("create instance") + l := log.WithField("backend_type", config.Get().Mempool.PoolType). + WithField("max_size_mb", config.Get().Mempool.MaxSizeMB) + getMempoolTxsChan := make(chan rpcbus.Request, 1) if err := rpcBus.Register(topics.GetMempoolTxs, getMempoolTxsChan); err != nil { log.WithError(err).Error("failed to register topics.GetMempoolTxs") @@ -103,6 +111,28 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra acceptedBlockChan, _ := consensus.InitAcceptedBlockUpdate(eventBus) + // Enable rate limiter from config + cfg := config.Get().Mempool + + var limiter *rate.Limiter + + if len(cfg.PropagateTimeout) > 0 { + timeout, err := time.ParseDuration(cfg.PropagateTimeout) + if err != nil { + log.WithError(err).Fatal("could not parse mempool propagation timeout") + } + + burst := cfg.PropagateBurst + if burst == 0 { + burst = 1 + } + + limiter = rate.NewLimiter(rate.Every(timeout), int(burst)) + + l = l.WithField("propagate_timeout", cfg.PropagateTimeout). + WithField("propagate_burst", burst) + } + m := &Mempool{ eventBus: eventBus, latestBlockTimestamp: math.MinInt32, @@ -111,13 +141,15 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra getMempoolTxsBySizeChan: getMempoolTxsBySizeChan, sendTxChan: sendTxChan, verifier: verifier, + limiter: limiter, + pendingPropagation: make(chan TxDesc, 1000), } // Setting the pool where to cache verified transactions. // The pool is normally a Hashmap m.verified = m.newPool() - log.WithField("type", config.Get().Mempool.PoolType).Info("running") + l.Info("running") if srv != nil { node.RegisterMempoolServer(srv, m) @@ -126,42 +158,80 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra return m } -// Run spawns the mempool lifecycle routine. The whole mempool cycle is around -// getting input from the outside world (from input channels) and provide the -// actual list of the verified txs (onto output channel). -// -// All operations are always executed in a single go-routine so no -// protection-by-mutex needed. +// Run spawns the mempool lifecycle routines. func (m *Mempool) Run(ctx context.Context) { - go func() { - ticker := time.NewTicker(idleTime) - defer ticker.Stop() - - for { - select { - // rpcbus methods. - case r := <-m.sendTxChan: - go handleRequest(r, m.processSendMempoolTxRequest, "SendTx") - case r := <-m.getMempoolTxsChan: - handleRequest(r, m.processGetMempoolTxsRequest, "GetMempoolTxs") - case r := <-m.getMempoolTxsBySizeChan: - handleRequest(r, m.processGetMempoolTxsBySizeRequest, "GetMempoolTxsBySize") - case b := <-m.acceptedBlockChan: - m.onBlock(b) - case <-ticker.C: - m.onIdle() - // Mempool terminating. - case <-ctx.Done(): - // m.eventBus.Unsubscribe(topics.Tx, m.txSubscriberID) - return + // Main Loop + go m.Loop(ctx) + + // Loop to drain pendingPropagation and try to propagate transaction + go m.propagateLoop(ctx) +} + +// Loop listens for GetMempoolTxs request and topics.AcceptedBlock events. +func (m *Mempool) Loop(ctx context.Context) { + ticker := time.NewTicker(idleTime) + defer ticker.Stop() + + for { + select { + // rpcbus methods. + case r := <-m.sendTxChan: + // TODO: This handler should be deleted once new wallet is integrated + go handleRequest(r, m.processSendMempoolTxRequest, "SendTx") + case r := <-m.getMempoolTxsChan: + handleRequest(r, m.processGetMempoolTxsRequest, "GetMempoolTxs") + case r := <-m.getMempoolTxsBySizeChan: + handleRequest(r, m.processGetMempoolTxsBySizeRequest, "GetMempoolTxsBySize") + case b := <-m.acceptedBlockChan: + m.onBlock(b) + case <-ticker.C: + m.onIdle() + case <-ctx.Done(): + // Mempool terminating + return + } + + ticker.Reset(idleTime) + } +} + +func (m *Mempool) propagateLoop(ctx context.Context) { + for { + select { + case t := <-m.pendingPropagation: + // Ensure we propagate at proper rate + if m.limiter != nil { + if err := m.limiter.Wait(ctx); err != nil { + log.WithError(err).Error("failed to limit rate") + } + } + + txid, err := t.tx.CalculateHash() + if err != nil { + log.WithError(err).Error("failed to calc hash") + continue } - ticker.Reset(idleTime) + if config.Get().Kadcast.Enabled { + // Broadcast ful transaction data in kadcast + err = m.kadcastTx(t) + } else { + // Advertise the transaction hash to gossip network via "Inventory Vectors" + err = m.advertiseTx(txid) + } + + if err != nil { + log.WithField("txid", hex.EncodeToString(txid)).WithError(err).Error("failed to propagate") + } + + // Mempool terminating + case <-ctx.Done(): + return } - }() + } } -// ProcessTx handles a submitted tx from any source (rpcBus or eventBus). +// ProcessTx processes a Transaction wire message. func (m *Mempool) ProcessTx(srcPeerID string, msg message.Message) ([]bytes.Buffer, error) { maxSizeBytes := config.Get().Mempool.MaxSizeMB * 1000 * 1000 if m.verified.Size() > maxSizeBytes { @@ -176,7 +246,12 @@ func (m *Mempool) ProcessTx(srcPeerID string, msg message.Message) ([]bytes.Buff h = msg.Header()[0] } - t := TxDesc{tx: msg.Payload().(transactions.ContractCall), received: time.Now(), size: uint(len(msg.Id())), kadHeight: h} + t := TxDesc{ + tx: msg.Payload().(transactions.ContractCall), + received: time.Now(), + size: uint(len(msg.Id())), + kadHeight: h, + } start := time.Now() txid, err := m.processTx(t) @@ -234,33 +309,14 @@ func (m *Mempool) processTx(t TxDesc) ([]byte, error) { return txid, fmt.Errorf("store err - %v", err) } - // try to (re)propagate transaction in both gossip and kadcast networks - m.propagateTx(t, txid) + // queue transaction for (re)propagation + go func() { + m.pendingPropagation <- t + }() return txid, nil } -// propagateTx (re)-propagate tx in gossip or kadcast network but not in both. -func (m *Mempool) propagateTx(t TxDesc, txid []byte) { - if config.Get().Kadcast.Enabled { - // Kadcast complete transaction data - if err := m.kadcastTx(t); err != nil { - log. - WithError(err). - WithField("txid", txid). - Error("kadcast propagation failed") - } - } else { - // Advertise the transaction hash to gossip network via "Inventory Vectors" - if err := m.advertiseTx(txid); err != nil { - log. - WithError(err). - WithField("txid", txid). - Error("gossip propagation failed") - } - } -} - func (m *Mempool) onBlock(b block.Block) { m.latestBlockTimestamp = b.Header.Timestamp m.removeAccepted(b)