Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support both rate limiter and pendingPropagation in mempool #1238

Merged
merged 6 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions harness/engine/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ func Profile1(index int, node *DuskNode, walletPath string) {
viper.Set("mempool.poolType", "hashmap")
viper.Set("mempool.preallocTxs", "100")
viper.Set("mempool.maxInvItems", "10000")
viper.Set("mempool.propagateTimeout", "100ms")
viper.Set("mempool.propagateBurst", 1)

viper.Set("consensus.defaultlocktime", 1000)
viper.Set("consensus.defaultoffset", 10)
Expand Down
3 changes: 1 addition & 2 deletions harness/engine/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,8 +569,7 @@ func (n *Network) BatchSendTransferTx(t *testing.T, senderNodeInd uint, batchSiz
for i := uint(0); i < batchSize; i++ {
req := pb.TransferRequest{Amount: amount, Address: pubKey, Fee: fee}

clientDeadline := time.Now().Add(timeout)
ctx, cancel := context.WithDeadline(context.Background(), clientDeadline)
ctx, cancel := context.WithTimeout(context.Background(), timeout)

_, err := client.Transfer(ctx, &req)
if err != nil {
Expand Down
46 changes: 24 additions & 22 deletions harness/tests/localnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,31 +305,33 @@ func TestMeasureNetworkTPS(t *testing.T) {
}

// Wait until first two blocks are accepted
localNet.WaitUntil(t, 0, 2, 2*time.Minute, 5*time.Second)
go func() {
localNet.WaitUntil(t, 0, 30, 5*time.Minute, 5*time.Second)

// Transaction Load test.
// Each of the nodes in the network starts sending transfer txs up to batchSize
batchSizeEnv, _ := os.LookupEnv("DUSK_TX_BATCH_SIZE")
batchSize, _ := strconv.Atoi(batchSizeEnv)
// Transaction Load test.
// Each of the nodes in the network starts sending transfer txs up to batchSize
batchSizeEnv, _ := os.LookupEnv("DUSK_TX_BATCH_SIZE")
batchSize, _ := strconv.Atoi(batchSizeEnv)

if batchSize == 0 {
batchSize = 300
}
if batchSize == 0 {
batchSize = 300
}

for i := uint(0); i < uint(localNet.Size()); i++ {
logrus.WithField("node_id", i).WithField("batch_size", batchSize).
Info("start sending transfer txs ...")

// Start concurrently flooding the network with batch of Transfer transactions
go func(ind uint) {
if err := localNet.BatchSendTransferTx(t, ind, uint(batchSize), 100, 10, time.Minute); err != nil {
logrus.Error(err)
}

logrus.WithField("node_id", ind).WithField("batch_size", batchSize).
Info("batch of transfer txs completed")
}(i)
}
for i := uint(0); i < uint(localNet.Size()); i++ {
logrus.WithField("node_id", i).WithField("batch_size", batchSize).
Info("start sending transfer txs ...")

// Start concurrently flooding the network with batch of Transfer transactions
go func(ind uint) {
if err := localNet.BatchSendTransferTx(t, ind, uint(batchSize), 100, 10, 10*time.Minute); err != nil {
logrus.Error(err)
}

logrus.WithField("node_id", ind).WithField("batch_size", batchSize).
Info("batch of transfer txs completed")
}(i)
}
}()

// Start monitoring TPS metric of the network
localNet.MonitorTPS(4 * time.Second)
Expand Down
10 changes: 6 additions & 4 deletions pkg/config/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,12 @@ type performanceConfiguration struct {
}

type mempoolConfiguration struct {
MaxSizeMB uint32
PoolType string
PreallocTxs uint32
MaxInvItems uint32
MaxSizeMB uint32
PoolType string
PreallocTxs uint32
MaxInvItems uint32
PropagateTimeout string
PropagateBurst uint32
}

type consensusConfiguration struct {
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/samples/default.dusk.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ preallocTxs = 100
# Max number of items to respond with on topics.Mempool request
# To disable topics.Mempool handling, set it to 0
maxInvItems = 10000
# Back pressure on transaction propagation
propagateTimeout = "100ms"
propagateBurst = 1

# gRPC API service
[rpc]
Expand Down
166 changes: 111 additions & 55 deletions pkg/core/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"github.com/dusk-network/dusk-blockchain/pkg/util/diagnostics"
"golang.org/x/time/rate"

"github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus"
Expand Down Expand Up @@ -55,6 +56,8 @@ type Mempool struct {
// verified txs to be included in next block.
verified Pool

pendingPropagation chan TxDesc

// the collector to listen for new accepted blocks.
acceptedBlockChan <-chan block.Block

Expand All @@ -65,6 +68,8 @@ type Mempool struct {

// the magic function that knows best what is valid chain Tx.
verifier transactions.UnconfirmedTxProber

limiter *rate.Limiter
}

// checkTx is responsible to determine if a tx is valid or not.
Expand All @@ -86,6 +91,9 @@ func (m *Mempool) checkTx(tx transactions.ContractCall) error {
func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier transactions.UnconfirmedTxProber, srv *grpc.Server) *Mempool {
log.Infof("create instance")

l := log.WithField("backend_type", config.Get().Mempool.PoolType).
WithField("max_size_mb", config.Get().Mempool.MaxSizeMB)

getMempoolTxsChan := make(chan rpcbus.Request, 1)
if err := rpcBus.Register(topics.GetMempoolTxs, getMempoolTxsChan); err != nil {
log.WithError(err).Error("failed to register topics.GetMempoolTxs")
Expand All @@ -103,6 +111,28 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra

acceptedBlockChan, _ := consensus.InitAcceptedBlockUpdate(eventBus)

// Enable rate limiter from config
cfg := config.Get().Mempool

var limiter *rate.Limiter

if len(cfg.PropagateTimeout) > 0 {
timeout, err := time.ParseDuration(cfg.PropagateTimeout)
if err != nil {
log.WithError(err).Fatal("could not parse mempool propagation timeout")
}

burst := cfg.PropagateBurst
if burst == 0 {
burst = 1
}

limiter = rate.NewLimiter(rate.Every(timeout), int(burst))

l = l.WithField("propagate_timeout", cfg.PropagateTimeout).
WithField("propagate_burst", burst)
}

m := &Mempool{
eventBus: eventBus,
latestBlockTimestamp: math.MinInt32,
Expand All @@ -111,13 +141,15 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra
getMempoolTxsBySizeChan: getMempoolTxsBySizeChan,
sendTxChan: sendTxChan,
verifier: verifier,
limiter: limiter,
pendingPropagation: make(chan TxDesc, 1000),
}

// Setting the pool where to cache verified transactions.
// The pool is normally a Hashmap
m.verified = m.newPool()

log.WithField("type", config.Get().Mempool.PoolType).Info("running")
l.Info("running")

if srv != nil {
node.RegisterMempoolServer(srv, m)
Expand All @@ -126,42 +158,80 @@ func NewMempool(eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus, verifier tra
return m
}

// Run spawns the mempool lifecycle routine. The whole mempool cycle is around
// getting input from the outside world (from input channels) and provide the
// actual list of the verified txs (onto output channel).
//
// All operations are always executed in a single go-routine so no
// protection-by-mutex needed.
// Run spawns the mempool lifecycle routines.
func (m *Mempool) Run(ctx context.Context) {
go func() {
ticker := time.NewTicker(idleTime)
defer ticker.Stop()

for {
select {
// rpcbus methods.
case r := <-m.sendTxChan:
go handleRequest(r, m.processSendMempoolTxRequest, "SendTx")
case r := <-m.getMempoolTxsChan:
handleRequest(r, m.processGetMempoolTxsRequest, "GetMempoolTxs")
case r := <-m.getMempoolTxsBySizeChan:
handleRequest(r, m.processGetMempoolTxsBySizeRequest, "GetMempoolTxsBySize")
case b := <-m.acceptedBlockChan:
m.onBlock(b)
case <-ticker.C:
m.onIdle()
// Mempool terminating.
case <-ctx.Done():
// m.eventBus.Unsubscribe(topics.Tx, m.txSubscriberID)
return
// Main Loop
go m.Loop(ctx)

// Loop to drain pendingPropagation and try to propagate transaction
go m.propagateLoop(ctx)
}

// Loop listens for GetMempoolTxs request and topics.AcceptedBlock events.
func (m *Mempool) Loop(ctx context.Context) {
ticker := time.NewTicker(idleTime)
defer ticker.Stop()

for {
select {
// rpcbus methods.
case r := <-m.sendTxChan:
// TODO: This handler should be deleted once new wallet is integrated
go handleRequest(r, m.processSendMempoolTxRequest, "SendTx")
case r := <-m.getMempoolTxsChan:
handleRequest(r, m.processGetMempoolTxsRequest, "GetMempoolTxs")
case r := <-m.getMempoolTxsBySizeChan:
handleRequest(r, m.processGetMempoolTxsBySizeRequest, "GetMempoolTxsBySize")
case b := <-m.acceptedBlockChan:
m.onBlock(b)
case <-ticker.C:
m.onIdle()
case <-ctx.Done():
// Mempool terminating
return
}

ticker.Reset(idleTime)
}
}

func (m *Mempool) propagateLoop(ctx context.Context) {
for {
select {
case t := <-m.pendingPropagation:
// Ensure we propagate at proper rate
if m.limiter != nil {
if err := m.limiter.Wait(ctx); err != nil {
log.WithError(err).Error("failed to limit rate")
}
}
goshawk-3 marked this conversation as resolved.
Show resolved Hide resolved

txid, err := t.tx.CalculateHash()
if err != nil {
log.WithError(err).Error("failed to calc hash")
continue
}

ticker.Reset(idleTime)
if config.Get().Kadcast.Enabled {
// Broadcast ful transaction data in kadcast
err = m.kadcastTx(t)
} else {
// Advertise the transaction hash to gossip network via "Inventory Vectors"
err = m.advertiseTx(txid)
}

if err != nil {
log.WithField("txid", hex.EncodeToString(txid)).WithError(err).Error("failed to propagate")
}

// Mempool terminating
case <-ctx.Done():
return
autholykos marked this conversation as resolved.
Show resolved Hide resolved
}
}()
}
}

// ProcessTx handles a submitted tx from any source (rpcBus or eventBus).
// ProcessTx processes a Transaction wire message.
func (m *Mempool) ProcessTx(srcPeerID string, msg message.Message) ([]bytes.Buffer, error) {
maxSizeBytes := config.Get().Mempool.MaxSizeMB * 1000 * 1000
if m.verified.Size() > maxSizeBytes {
Expand All @@ -176,7 +246,12 @@ func (m *Mempool) ProcessTx(srcPeerID string, msg message.Message) ([]bytes.Buff
h = msg.Header()[0]
}

t := TxDesc{tx: msg.Payload().(transactions.ContractCall), received: time.Now(), size: uint(len(msg.Id())), kadHeight: h}
t := TxDesc{
tx: msg.Payload().(transactions.ContractCall),
received: time.Now(),
size: uint(len(msg.Id())),
kadHeight: h,
}

start := time.Now()
txid, err := m.processTx(t)
Expand Down Expand Up @@ -234,33 +309,14 @@ func (m *Mempool) processTx(t TxDesc) ([]byte, error) {
return txid, fmt.Errorf("store err - %v", err)
}

// try to (re)propagate transaction in both gossip and kadcast networks
m.propagateTx(t, txid)
// queue transaction for (re)propagation
go func() {
m.pendingPropagation <- t
}()

return txid, nil
}

// propagateTx (re)-propagate tx in gossip or kadcast network but not in both.
func (m *Mempool) propagateTx(t TxDesc, txid []byte) {
if config.Get().Kadcast.Enabled {
// Kadcast complete transaction data
if err := m.kadcastTx(t); err != nil {
log.
WithError(err).
WithField("txid", txid).
Error("kadcast propagation failed")
}
} else {
// Advertise the transaction hash to gossip network via "Inventory Vectors"
if err := m.advertiseTx(txid); err != nil {
log.
WithError(err).
WithField("txid", txid).
Error("gossip propagation failed")
}
}
}

func (m *Mempool) onBlock(b block.Block) {
m.latestBlockTimestamp = b.Header.Timestamp
m.removeAccepted(b)
Expand Down