Skip to content

Commit

Permalink
Merge pull request #1238 from dusk-network/fix-1237
Browse files Browse the repository at this point in the history
Support both rate limiter and pendingPropagation in mempool
  • Loading branch information
goshawk-3 authored Jan 13, 2022
2 parents 9cdefec + 9ab9f2f commit 8a0b983
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 83 deletions.
2 changes: 2 additions & 0 deletions harness/engine/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func Profile1(index int, node *DuskNode, walletPath string) {
viper.Set("mempool.poolType", "hashmap")
viper.Set("mempool.preallocTxs", "100")
viper.Set("mempool.maxInvItems", "10000")
viper.Set("mempool.propagateTimeout", "100ms")
viper.Set("mempool.propagateBurst", 1)

viper.Set("consensus.defaultlocktime", 1000)
viper.Set("consensus.defaultoffset", 10)
Expand Down
3 changes: 1 addition & 2 deletions harness/engine/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,7 @@ func (n *Network) BatchSendTransferTx(t *testing.T, senderNodeInd uint, batchSiz
for i := uint(0); i < batchSize; i++ {
req := pb.TransferRequest{Amount: amount, Address: pubKey, Fee: fee}

clientDeadline := time.Now().Add(timeout)
ctx, cancel := context.WithDeadline(context.Background(), clientDeadline)
ctx, cancel := context.WithTimeout(context.Background(), timeout)

_, err := client.Transfer(ctx, &req)
if err != nil {
Expand Down
46 changes: 24 additions & 22 deletions harness/tests/localnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,31 +305,33 @@ func TestMeasureNetworkTPS(t *testing.T) {
}

// Wait until first two blocks are accepted
localNet.WaitUntil(t, 0, 2, 2*time.Minute, 5*time.Second)
go func() {
localNet.WaitUntil(t, 0, 30, 5*time.Minute, 5*time.Second)

// Transaction Load test.
// Each of the nodes in the network starts sending transfer txs up to batchSize
batchSizeEnv, _ := os.LookupEnv("DUSK_TX_BATCH_SIZE")
batchSize, _ := strconv.Atoi(batchSizeEnv)
// Transaction Load test.
// Each of the nodes in the network starts sending transfer txs up to batchSize
batchSizeEnv, _ := os.LookupEnv("DUSK_TX_BATCH_SIZE")
batchSize, _ := strconv.Atoi(batchSizeEnv)

if batchSize == 0 {
batchSize = 300
}
if batchSize == 0 {
batchSize = 300
}

for i := uint(0); i < uint(localNet.Size()); i++ {
logrus.WithField("node_id", i).WithField("batch_size", batchSize).
Info("start sending transfer txs ...")

// Start concurrently flooding the network with batch of Transfer transactions
go func(ind uint) {
if err := localNet.BatchSendTransferTx(t, ind, uint(batchSize), 100, 10, time.Minute); err != nil {
logrus.Error(err)
}

logrus.WithField("node_id", ind).WithField("batch_size", batchSize).
Info("batch of transfer txs completed")
}(i)
}
for i := uint(0); i < uint(localNet.Size()); i++ {
logrus.WithField("node_id", i).WithField("batch_size", batchSize).
Info("start sending transfer txs ...")

// Start concurrently flooding the network with batch of Transfer transactions
go func(ind uint) {
if err := localNet.BatchSendTransferTx(t, ind, uint(batchSize), 100, 10, 10*time.Minute); err != nil {
logrus.Error(err)
}

logrus.WithField("node_id", ind).WithField("batch_size", batchSize).
Info("batch of transfer txs completed")
}(i)
}
}()

// Start monitoring TPS metric of the network
localNet.MonitorTPS(4 * time.Second)
Expand Down
10 changes: 6 additions & 4 deletions pkg/config/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,12 @@ type performanceConfiguration struct {
}

type mempoolConfiguration struct {
MaxSizeMB uint32
PoolType string
PreallocTxs uint32
MaxInvItems uint32
MaxSizeMB uint32
PoolType string
PreallocTxs uint32
MaxInvItems uint32
PropagateTimeout string
PropagateBurst uint32
}

type consensusConfiguration struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/samples/default.dusk.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ preallocTxs = 100
# Max number of items to respond with on topics.Mempool request
# To disable topics.Mempool handling, set it to 0
maxInvItems = 10000
# Back pressure on transaction propagation
propagateTimeout = "100ms"
propagateBurst = 1

# gRPC API service
[rpc]
Expand Down
166 changes: 111 additions & 55 deletions pkg/core/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/dusk-network/dusk-blockchain/pkg/util/diagnostics"
"golang.org/x/time/rate"

"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus"
Expand Down Expand Up @@ -55,6 +56,8 @@ type Mempool struct {
// verified txs to be included in next block.
verified Pool

pendingPropagation chan TxDesc

// the collector to listen for new accepted blocks.
acceptedBlockChan <-chan block.Block

Expand All @@ -65,6 +68,8 @@ type Mempool struct {

// the magic function that knows best what is valid chain Tx.
verifier transactions.UnconfirmedTxProber

limiter *rate.Limiter
}

// checkTx is responsible to determine if a tx is valid or not.
Expand All @@ -86,6 +91,9 @@ func (m *Mempool) checkTx(tx transactions.ContractCall) error {
func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier transactions.UnconfirmedTxProber, srv *grpc.Server) *Mempool {
log.Infof("create instance")

l := log.WithField("backend_type", config.Get().Mempool.PoolType).
WithField("max_size_mb", config.Get().Mempool.MaxSizeMB)

getMempoolTxsChan := make(chan rpcbus.Request, 1)
if err := rpcBus.Register(topics.GetMempoolTxs, getMempoolTxsChan); err != nil {
log.WithError(err).Error("failed to register topics.GetMempoolTxs")
Expand All @@ -103,6 +111,28 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra

acceptedBlockChan, _ := consensus.InitAcceptedBlockUpdate(eventBus)

// Enable rate limiter from config
cfg := config.Get().Mempool

var limiter *rate.Limiter

if len(cfg.PropagateTimeout) > 0 {
timeout, err := time.ParseDuration(cfg.PropagateTimeout)
if err != nil {
log.WithError(err).Fatal("could not parse mempool propagation timeout")
}

burst := cfg.PropagateBurst
if burst == 0 {
burst = 1
}

limiter = rate.NewLimiter(rate.Every(timeout), int(burst))

l = l.WithField("propagate_timeout", cfg.PropagateTimeout).
WithField("propagate_burst", burst)
}

m := &Mempool{
eventBus: eventBus,
latestBlockTimestamp: math.MinInt32,
Expand All @@ -111,13 +141,15 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra
getMempoolTxsBySizeChan: getMempoolTxsBySizeChan,
sendTxChan: sendTxChan,
verifier: verifier,
limiter: limiter,
pendingPropagation: make(chan TxDesc, 1000),
}

// Setting the pool where to cache verified transactions.
// The pool is normally a Hashmap
m.verified = m.newPool()

log.WithField("type", config.Get().Mempool.PoolType).Info("running")
l.Info("running")

if srv != nil {
node.RegisterMempoolServer(srv, m)
Expand All @@ -126,42 +158,80 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra
return m
}

// Run spawns the mempool lifecycle routine. The whole mempool cycle is around
// getting input from the outside world (from input channels) and provide the
// actual list of the verified txs (onto output channel).
//
// All operations are always executed in a single go-routine so no
// protection-by-mutex needed.
// Run spawns the mempool lifecycle routines.
func (m *Mempool) Run(ctx context.Context) {
go func() {
ticker := time.NewTicker(idleTime)
defer ticker.Stop()

for {
select {
// rpcbus methods.
case r := <-m.sendTxChan:
go handleRequest(r, m.processSendMempoolTxRequest, "SendTx")
case r := <-m.getMempoolTxsChan:
handleRequest(r, m.processGetMempoolTxsRequest, "GetMempoolTxs")
case r := <-m.getMempoolTxsBySizeChan:
handleRequest(r, m.processGetMempoolTxsBySizeRequest, "GetMempoolTxsBySize")
case b := <-m.acceptedBlockChan:
m.onBlock(b)
case <-ticker.C:
m.onIdle()
// Mempool terminating.
case <-ctx.Done():
// m.eventBus.Unsubscribe(topics.Tx, m.txSubscriberID)
return
// Main Loop
go m.Loop(ctx)

// Loop to drain pendingPropagation and try to propagate transaction
go m.propagateLoop(ctx)
}

// Loop listens for GetMempoolTxs request and topics.AcceptedBlock events.
func (m *Mempool) Loop(ctx context.Context) {
ticker := time.NewTicker(idleTime)
defer ticker.Stop()

for {
select {
// rpcbus methods.
case r := <-m.sendTxChan:
// TODO: This handler should be deleted once new wallet is integrated
go handleRequest(r, m.processSendMempoolTxRequest, "SendTx")
case r := <-m.getMempoolTxsChan:
handleRequest(r, m.processGetMempoolTxsRequest, "GetMempoolTxs")
case r := <-m.getMempoolTxsBySizeChan:
handleRequest(r, m.processGetMempoolTxsBySizeRequest, "GetMempoolTxsBySize")
case b := <-m.acceptedBlockChan:
m.onBlock(b)
case <-ticker.C:
m.onIdle()
case <-ctx.Done():
// Mempool terminating
return
}

ticker.Reset(idleTime)
}
}

func (m *Mempool) propagateLoop(ctx context.Context) {
for {
select {
case t := <-m.pendingPropagation:
// Ensure we propagate at proper rate
if m.limiter != nil {
if err := m.limiter.Wait(ctx); err != nil {
log.WithError(err).Error("failed to limit rate")
}
}

txid, err := t.tx.CalculateHash()
if err != nil {
log.WithError(err).Error("failed to calc hash")
continue
}

ticker.Reset(idleTime)
if config.Get().Kadcast.Enabled {
// Broadcast ful transaction data in kadcast
err = m.kadcastTx(t)
} else {
// Advertise the transaction hash to gossip network via "Inventory Vectors"
err = m.advertiseTx(txid)
}

if err != nil {
log.WithField("txid", hex.EncodeToString(txid)).WithError(err).Error("failed to propagate")
}

// Mempool terminating
case <-ctx.Done():
return
}
}()
}
}

// ProcessTx handles a submitted tx from any source (rpcBus or eventBus).
// ProcessTx processes a Transaction wire message.
func (m *Mempool) ProcessTx(srcPeerID string, msg message.Message) ([]bytes.Buffer, error) {
maxSizeBytes := config.Get().Mempool.MaxSizeMB * 1000 * 1000
if m.verified.Size() > maxSizeBytes {
Expand All @@ -176,7 +246,12 @@ func (m *Mempool) ProcessTx(srcPeerID string, msg message.Message) ([]bytes.Buff
h = msg.Header()[0]
}

t := TxDesc{tx: msg.Payload().(transactions.ContractCall), received: time.Now(), size: uint(len(msg.Id())), kadHeight: h}
t := TxDesc{
tx: msg.Payload().(transactions.ContractCall),
received: time.Now(),
size: uint(len(msg.Id())),
kadHeight: h,
}

start := time.Now()
txid, err := m.processTx(t)
Expand Down Expand Up @@ -234,33 +309,14 @@ func (m *Mempool) processTx(t TxDesc) ([]byte, error) {
return txid, fmt.Errorf("store err - %v", err)
}

// try to (re)propagate transaction in both gossip and kadcast networks
m.propagateTx(t, txid)
// queue transaction for (re)propagation
go func() {
m.pendingPropagation <- t
}()

return txid, nil
}

// propagateTx (re)-propagate tx in gossip or kadcast network but not in both.
func (m *Mempool) propagateTx(t TxDesc, txid []byte) {
if config.Get().Kadcast.Enabled {
// Kadcast complete transaction data
if err := m.kadcastTx(t); err != nil {
log.
WithError(err).
WithField("txid", txid).
Error("kadcast propagation failed")
}
} else {
// Advertise the transaction hash to gossip network via "Inventory Vectors"
if err := m.advertiseTx(txid); err != nil {
log.
WithError(err).
WithField("txid", txid).
Error("gossip propagation failed")
}
}
}

func (m *Mempool) onBlock(b block.Block) {
m.latestBlockTimestamp = b.Header.Timestamp
m.removeAccepted(b)
Expand Down

0 comments on commit 8a0b983

Please sign in to comment.