Skip to content

Commit

Permalink
Increase virtualChangeChan to 100e3 (kaspanet#2056)
Browse files Browse the repository at this point in the history
* Increase virtualChangeChan to 100e3
Don't crash when sending UTXO RPC notification to a closed route
Throw error if virtualChangeChan is full

* Use MaybeEnqueue in more places

* Remove comment

* Ignore capacity reached errors on MaybeEnqueue
  • Loading branch information
someone235 authored May 20, 2022
1 parent 48c7fa0 commit 4dd7113
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 11 deletions.
5 changes: 0 additions & 5 deletions app/rpc/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ func (m *Manager) notifyVirtualChange(virtualChangeSet *externalapi.VirtualChang
onEnd := logger.LogAndMeasureExecutionTime(log, "RPCManager.NotifyVirtualChange")
defer onEnd()

/*
NOTE: nothing under this function is allowed to acquire the consensus lock, since
the function is triggered by a channel call under consensus lock which might block
*/

if m.context.Config.UTXOIndex && virtualChangeSet.VirtualUTXODiff != nil {
err := m.notifyUTXOsChanged(virtualChangeSet)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions app/rpc/rpccontext/notificationmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (nm *NotificationManager) NotifyVirtualSelectedParentChainChanged(
var err error

if listener.includeAcceptedTransactionIDsInVirtualSelectedParentChainChangedNotifications {
err = router.OutgoingRoute().Enqueue(notification)
err = router.OutgoingRoute().MaybeEnqueue(notification)
} else {
err = router.OutgoingRoute().Enqueue(notificationWithoutAcceptedTransactionIDs)
err = router.OutgoingRoute().MaybeEnqueue(notificationWithoutAcceptedTransactionIDs)
}

if err != nil {
Expand Down Expand Up @@ -206,7 +206,7 @@ func (nm *NotificationManager) NotifyUTXOsChanged(utxoChanges *utxoindex.UTXOCha
}

// Enqueue the notification
err = router.OutgoingRoute().Enqueue(notification)
err = router.OutgoingRoute().MaybeEnqueue(notification)
if err != nil {
return err
}
Expand All @@ -225,7 +225,7 @@ func (nm *NotificationManager) NotifyVirtualSelectedParentBlueScoreChanged(

for router, listener := range nm.listeners {
if listener.propagateVirtualSelectedParentBlueScoreChangedNotifications {
err := router.OutgoingRoute().Enqueue(notification)
err := router.OutgoingRoute().MaybeEnqueue(notification)
if err != nil {
return err
}
Expand All @@ -244,7 +244,7 @@ func (nm *NotificationManager) NotifyVirtualDaaScoreChanged(

for router, listener := range nm.listeners {
if listener.propagateVirtualDaaScoreChangedNotifications {
err := router.OutgoingRoute().Enqueue(notification)
err := router.OutgoingRoute().MaybeEnqueue(notification)
if err != nil {
return err
}
Expand Down
4 changes: 4 additions & 0 deletions domain/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (s *consensus) onVirtualChange(virtualChangeSet *externalapi.VirtualChangeS
return nil
}

if len(s.virtualChangeChan) == cap(s.virtualChangeChan) {
return errors.Errorf("virtualChangeChan is full")
}

stagingArea := model.NewStagingArea()
virtualGHOSTDAGData, err := s.ghostdagDataStores[0].Get(s.databaseContext, stagingArea, model.VirtualBlockHash, false)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func New(consensusConfig *consensus.Config, mempoolConfig *mempool.Config, db in
}
}

virtualChangeChan := make(chan *externalapi.VirtualChangeSet, 1000)
virtualChangeChan := make(chan *externalapi.VirtualChangeSet, 100e3)
consensusFactory := consensus.NewFactory()
consensusInstance, shouldMigrate, err := consensusFactory.NewConsensus(consensusConfig, db, activePrefix, virtualChangeChan)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions infrastructure/network/netadapter/router/log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package router

import (
"github.com/kaspanet/kaspad/infrastructure/logger"
)

var log = logger.RegisterSubSystem("ROUT")
17 changes: 17 additions & 0 deletions infrastructure/network/netadapter/router/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ func (r *Route) Enqueue(message appmessage.Message) error {
return nil
}

// MaybeEnqueue enqueues a message to the route, but doesn't throw an error
// if it's closed or its capacity has been reached.
func (r *Route) MaybeEnqueue(message appmessage.Message) error {
err := r.Enqueue(message)
if errors.Is(err, ErrRouteClosed) {
log.Infof("Couldn't send message to closed route '%s'", r.name)
return nil
}

if errors.Is(err, ErrRouteCapacityReached) {
log.Infof("Capacity (%d) of route '%s' has been reached. Couldn't send message", r.capacity, r.name)
return nil
}

return err
}

// Dequeue dequeues a message from the Route
func (r *Route) Dequeue() (appmessage.Message, error) {
message, isOpen := <-r.channel
Expand Down

0 comments on commit 4dd7113

Please sign in to comment.