Skip to content

Commit

Permalink
Use a channel for utxo change events (kaspanet#2052)
Browse files Browse the repository at this point in the history
* Use a channel from within consensus in order to raise change events in order -- note that this is only a draft commit for discussion

* Fix compilation

* Check for nil

* Allow nil virtualChangeChan

* Remove redundant comments

* Call notifyVirtualChange instead of notifyUTXOsChanged

* Remove redundant comment

* Add a separate function for initVirtualChangeHandler

* Remove redundant type

* Check for nil in the right place

* Fix integration test

* Add data to virtual changeset and cleanup block added event logic

* Renames

* Comment

Co-authored-by: Ori Newman <[email protected]>
  • Loading branch information
michaelsutton and someone235 authored May 19, 2022
1 parent 5d24e2a commit 016ddfd
Show file tree
Hide file tree
Showing 16 changed files with 176 additions and 122 deletions.
7 changes: 5 additions & 2 deletions app/component_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"fmt"
"github.com/kaspanet/kaspad/domain/consensus/model/externalapi"
"sync/atomic"

"github.com/kaspanet/kaspad/domain/miningmanager/mempool"
Expand Down Expand Up @@ -67,6 +68,7 @@ func (a *ComponentManager) Stop() {
}

a.protocolManager.Close()
close(a.protocolManager.Context().Domain().VirtualChangeChannel())

return
}
Expand Down Expand Up @@ -118,7 +120,7 @@ func NewComponentManager(cfg *config.Config, db infrastructuredatabase.Database,
if err != nil {
return nil, err
}
rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, interrupt)
rpcManager := setupRPC(cfg, domain, netAdapter, protocolManager, connectionManager, addressManager, utxoIndex, domain.VirtualChangeChannel(), interrupt)

return &ComponentManager{
cfg: cfg,
Expand All @@ -139,6 +141,7 @@ func setupRPC(
connectionManager *connmanager.ConnectionManager,
addressManager *addressmanager.AddressManager,
utxoIndex *utxoindex.UTXOIndex,
virtualChangeChan chan *externalapi.VirtualChangeSet,
shutDownChan chan<- struct{},
) *rpc.Manager {

Expand All @@ -150,9 +153,9 @@ func setupRPC(
connectionManager,
addressManager,
utxoIndex,
virtualChangeChan,
shutDownChan,
)
protocolManager.SetOnVirtualChange(rpcManager.NotifyVirtualChange)
protocolManager.SetOnNewBlockTemplateHandler(rpcManager.NotifyNewBlockTemplate)
protocolManager.SetOnBlockAddedToDAGHandler(rpcManager.NotifyBlockAddedToDAG)
protocolManager.SetOnPruningPointUTXOSetOverrideHandler(rpcManager.NotifyPruningPointUTXOSetOverride)
Expand Down
31 changes: 8 additions & 23 deletions app/protocol/flowcontext/blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,24 @@ import (
// OnNewBlock updates the mempool after a new block arrival, and
// relays newly unorphaned transactions and possibly rebroadcast
// manually added transactions when not in IBD.
func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock,
virtualChangeSet *externalapi.VirtualChangeSet) error {
func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock) error {

hash := consensushashing.BlockHash(block)
log.Tracef("OnNewBlock start for block %s", hash)
defer log.Tracef("OnNewBlock end for block %s", hash)

unorphaningResults, err := f.UnorphanBlocks(block)
unorphanedBlocks, err := f.UnorphanBlocks(block)
if err != nil {
return err
}

log.Debugf("OnNewBlock: block %s unorphaned %d blocks", hash, len(unorphaningResults))
log.Debugf("OnNewBlock: block %s unorphaned %d blocks", hash, len(unorphanedBlocks))

newBlocks := []*externalapi.DomainBlock{block}
newVirtualChangeSets := []*externalapi.VirtualChangeSet{virtualChangeSet}
for _, unorphaningResult := range unorphaningResults {
newBlocks = append(newBlocks, unorphaningResult.block)
newVirtualChangeSets = append(newVirtualChangeSets, unorphaningResult.virtualChangeSet)
}
newBlocks = append(newBlocks, unorphanedBlocks...)

allAcceptedTransactions := make([]*externalapi.DomainTransaction, 0)
for i, newBlock := range newBlocks {
for _, newBlock := range newBlocks {
log.Debugf("OnNewBlock: passing block %s transactions to mining manager", hash)
acceptedTransactions, err := f.Domain().MiningManager().HandleNewBlockTransactions(newBlock.Transactions)
if err != nil {
Expand All @@ -48,8 +43,7 @@ func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock,

if f.onBlockAddedToDAGHandler != nil {
log.Debugf("OnNewBlock: calling f.onBlockAddedToDAGHandler for block %s", hash)
virtualChangeSet = newVirtualChangeSets[i]
err := f.onBlockAddedToDAGHandler(newBlock, virtualChangeSet)
err := f.onBlockAddedToDAGHandler(newBlock)
if err != nil {
return err
}
Expand All @@ -59,15 +53,6 @@ func (f *FlowContext) OnNewBlock(block *externalapi.DomainBlock,
return f.broadcastTransactionsAfterBlockAdded(newBlocks, allAcceptedTransactions)
}

// OnVirtualChange calls the handler function whenever the virtual block changes.
func (f *FlowContext) OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error {
if f.onVirtualChangeHandler != nil && virtualChangeSet != nil {
return f.onVirtualChangeHandler(virtualChangeSet)
}

return nil
}

// OnNewBlockTemplate calls the handler function whenever a new block template is available for miners.
func (f *FlowContext) OnNewBlockTemplate() error {
// Clear current template cache. Note we call this even if the handler is nil, in order to keep the
Expand Down Expand Up @@ -130,7 +115,7 @@ func (f *FlowContext) AddBlock(block *externalapi.DomainBlock) error {
return protocolerrors.Errorf(false, "cannot add header only block")
}

virtualChangeSet, err := f.Domain().Consensus().ValidateAndInsertBlock(block, true)
_, err := f.Domain().Consensus().ValidateAndInsertBlock(block, true)
if err != nil {
if errors.As(err, &ruleerrors.RuleError{}) {
log.Warnf("Validation failed for block %s: %s", consensushashing.BlockHash(block), err)
Expand All @@ -141,7 +126,7 @@ func (f *FlowContext) AddBlock(block *externalapi.DomainBlock) error {
if err != nil {
return err
}
err = f.OnNewBlock(block, virtualChangeSet)
err = f.OnNewBlock(block)
if err != nil {
return err
}
Expand Down
11 changes: 1 addition & 10 deletions app/protocol/flowcontext/flow_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ import (

// OnBlockAddedToDAGHandler is a handler function that's triggered
// when a block is added to the DAG
type OnBlockAddedToDAGHandler func(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error

// OnVirtualChangeHandler is a handler function that's triggered when the virtual changes
type OnVirtualChangeHandler func(virtualChangeSet *externalapi.VirtualChangeSet) error
type OnBlockAddedToDAGHandler func(block *externalapi.DomainBlock) error

// OnNewBlockTemplateHandler is a handler function that's triggered when a new block template is available
type OnNewBlockTemplateHandler func() error
Expand All @@ -47,7 +44,6 @@ type FlowContext struct {

timeStarted int64

onVirtualChangeHandler OnVirtualChangeHandler
onBlockAddedToDAGHandler OnBlockAddedToDAGHandler
onNewBlockTemplateHandler OnNewBlockTemplateHandler
onPruningPointUTXOSetOverrideHandler OnPruningPointUTXOSetOverrideHandler
Expand Down Expand Up @@ -111,11 +107,6 @@ func (f *FlowContext) IsNearlySynced() (bool, error) {
return f.Domain().Consensus().IsNearlySynced()
}

// SetOnVirtualChangeHandler sets the onVirtualChangeHandler handler
func (f *FlowContext) SetOnVirtualChangeHandler(onVirtualChangeHandler OnVirtualChangeHandler) {
f.onVirtualChangeHandler = onVirtualChangeHandler
}

// SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler
func (f *FlowContext) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler OnBlockAddedToDAGHandler) {
f.onBlockAddedToDAGHandler = onBlockAddedToDAGHandler
Expand Down
31 changes: 11 additions & 20 deletions app/protocol/flowcontext/orphans.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,6 @@ import (
// on: 2^orphanResolutionRange * PHANTOM K.
const maxOrphans = 600

// UnorphaningResult is the result of unorphaning a block
type UnorphaningResult struct {
block *externalapi.DomainBlock
virtualChangeSet *externalapi.VirtualChangeSet
}

// AddOrphan adds the block to the orphan set
func (f *FlowContext) AddOrphan(orphanBlock *externalapi.DomainBlock) {
f.orphansMutex.Lock()
Expand Down Expand Up @@ -57,7 +51,7 @@ func (f *FlowContext) IsOrphan(blockHash *externalapi.DomainHash) bool {
}

// UnorphanBlocks removes the block from the orphan set, and remove all of the blocks that are not orphans anymore.
func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*UnorphaningResult, error) {
func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*externalapi.DomainBlock, error) {
f.orphansMutex.Lock()
defer f.orphansMutex.Unlock()

Expand All @@ -66,7 +60,7 @@ func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*Uno
rootBlockHash := consensushashing.BlockHash(rootBlock)
processQueue := f.addChildOrphansToProcessQueue(rootBlockHash, []externalapi.DomainHash{})

var unorphaningResults []*UnorphaningResult
var unorphanedBlocks []*externalapi.DomainBlock
for len(processQueue) > 0 {
var orphanHash externalapi.DomainHash
orphanHash, processQueue = processQueue[0], processQueue[1:]
Expand All @@ -90,21 +84,18 @@ func (f *FlowContext) UnorphanBlocks(rootBlock *externalapi.DomainBlock) ([]*Uno
}
}
if canBeUnorphaned {
virtualChangeSet, unorphaningSucceeded, err := f.unorphanBlock(orphanHash)
unorphaningSucceeded, err := f.unorphanBlock(orphanHash)
if err != nil {
return nil, err
}
if unorphaningSucceeded {
unorphaningResults = append(unorphaningResults, &UnorphaningResult{
block: orphanBlock,
virtualChangeSet: virtualChangeSet,
})
unorphanedBlocks = append(unorphanedBlocks, orphanBlock)
processQueue = f.addChildOrphansToProcessQueue(&orphanHash, processQueue)
}
}
}

return unorphaningResults, nil
return unorphanedBlocks, nil
}

// addChildOrphansToProcessQueue finds all child orphans of `blockHash`
Expand Down Expand Up @@ -143,24 +134,24 @@ func (f *FlowContext) findChildOrphansOfBlock(blockHash *externalapi.DomainHash)
return childOrphans
}

func (f *FlowContext) unorphanBlock(orphanHash externalapi.DomainHash) (*externalapi.VirtualChangeSet, bool, error) {
func (f *FlowContext) unorphanBlock(orphanHash externalapi.DomainHash) (bool, error) {
orphanBlock, ok := f.orphans[orphanHash]
if !ok {
return nil, false, errors.Errorf("attempted to unorphan a non-orphan block %s", orphanHash)
return false, errors.Errorf("attempted to unorphan a non-orphan block %s", orphanHash)
}
delete(f.orphans, orphanHash)

virtualChangeSet, err := f.domain.Consensus().ValidateAndInsertBlock(orphanBlock, true)
_, err := f.domain.Consensus().ValidateAndInsertBlock(orphanBlock, true)
if err != nil {
if errors.As(err, &ruleerrors.RuleError{}) {
log.Warnf("Validation failed for orphan block %s: %s", orphanHash, err)
return nil, false, nil
return false, nil
}
return nil, false, err
return false, err
}

log.Infof("Unorphaned block %s", orphanHash)
return virtualChangeSet, true, nil
return true, nil
}

// GetOrphanRoots returns the roots of the missing ancestors DAG of the given orphan
Expand Down
19 changes: 9 additions & 10 deletions app/protocol/flows/v5/blockrelay/handle_relay_invs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ var orphanResolutionRange uint32 = 5
type RelayInvsContext interface {
Domain() domain.Domain
Config() *config.Config
OnNewBlock(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error
OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error
OnNewBlock(block *externalapi.DomainBlock) error
OnNewBlockTemplate() error
OnPruningPointUTXOSetOverride() error
SharedRequestedBlocks() *flowcontext.SharedRequestedBlocks
Expand Down Expand Up @@ -174,7 +173,7 @@ func (flow *handleRelayInvsFlow) start() error {
if err != nil {
return err
}
missingParents, virtualChangeSet, err := flow.processBlock(block)
missingParents, err := flow.processBlock(block)
if err != nil {
if errors.Is(err, ruleerrors.ErrPrunedBlock) {
log.Infof("Ignoring pruned block %s", inv.Hash)
Expand Down Expand Up @@ -233,7 +232,7 @@ func (flow *handleRelayInvsFlow) start() error {
}

log.Infof("Accepted block %s via relay", inv.Hash)
err = flow.OnNewBlock(block, virtualChangeSet)
err = flow.OnNewBlock(block)
if err != nil {
return err
}
Expand Down Expand Up @@ -320,25 +319,25 @@ func (flow *handleRelayInvsFlow) readMsgBlock() (msgBlock *appmessage.MsgBlock,
}
}

func (flow *handleRelayInvsFlow) processBlock(block *externalapi.DomainBlock) ([]*externalapi.DomainHash, *externalapi.VirtualChangeSet, error) {
func (flow *handleRelayInvsFlow) processBlock(block *externalapi.DomainBlock) ([]*externalapi.DomainHash, error) {
blockHash := consensushashing.BlockHash(block)
virtualChangeSet, err := flow.Domain().Consensus().ValidateAndInsertBlock(block, true)
_, err := flow.Domain().Consensus().ValidateAndInsertBlock(block, true)
if err != nil {
if !errors.As(err, &ruleerrors.RuleError{}) {
return nil, nil, errors.Wrapf(err, "failed to process block %s", blockHash)
return nil, errors.Wrapf(err, "failed to process block %s", blockHash)
}

missingParentsError := &ruleerrors.ErrMissingParents{}
if errors.As(err, missingParentsError) {
return missingParentsError.MissingParentHashes, nil, nil
return missingParentsError.MissingParentHashes, nil
}
// A duplicate block should not appear to the user as a warning and is already reported in the calling function
if !errors.Is(err, ruleerrors.ErrDuplicateBlock) {
log.Warnf("Rejected block %s from %s: %s", blockHash, flow.peer, err)
}
return nil, nil, protocolerrors.Wrapf(true, err, "got invalid block %s from relay", blockHash)
return nil, protocolerrors.Wrapf(true, err, "got invalid block %s from relay", blockHash)
}
return nil, virtualChangeSet, nil
return nil, nil
}

func (flow *handleRelayInvsFlow) relayBlock(block *externalapi.DomainBlock) error {
Expand Down
14 changes: 4 additions & 10 deletions app/protocol/flows/v5/blockrelay/ibd.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ import (
type IBDContext interface {
Domain() domain.Domain
Config() *config.Config
OnNewBlock(block *externalapi.DomainBlock, virtualChangeSet *externalapi.VirtualChangeSet) error
OnVirtualChange(virtualChangeSet *externalapi.VirtualChangeSet) error
OnNewBlock(block *externalapi.DomainBlock) error
OnNewBlockTemplate() error
OnPruningPointUTXOSetOverride() error
IsIBDRunning() bool
Expand Down Expand Up @@ -655,15 +654,15 @@ func (flow *handleIBDFlow) syncMissingBlockBodies(highHash *externalapi.DomainHa
return err
}

virtualChangeSet, err := flow.Domain().Consensus().ValidateAndInsertBlock(block, false)
_, err = flow.Domain().Consensus().ValidateAndInsertBlock(block, false)
if err != nil {
if errors.Is(err, ruleerrors.ErrDuplicateBlock) {
log.Debugf("Skipping IBD Block %s as it has already been added to the DAG", blockHash)
continue
}
return protocolerrors.ConvertToBanningProtocolErrorIfRuleError(err, "invalid block %s", blockHash)
}
err = flow.OnNewBlock(block, virtualChangeSet)
err = flow.OnNewBlock(block)
if err != nil {
return err
}
Expand Down Expand Up @@ -706,12 +705,7 @@ func (flow *handleIBDFlow) resolveVirtual(estimatedVirtualDAAScoreTarget uint64)
}
log.Infof("Resolving virtual. Estimated progress: %d%%", percents)
}
virtualChangeSet, isCompletelyResolved, err := flow.Domain().Consensus().ResolveVirtual()
if err != nil {
return err
}

err = flow.OnVirtualChange(virtualChangeSet)
_, isCompletelyResolved, err := flow.Domain().Consensus().ResolveVirtual()
if err != nil {
return err
}
Expand Down
5 changes: 0 additions & 5 deletions app/protocol/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@ func (m *Manager) runFlows(flows []*common.Flow, peer *peerpkg.Peer, errChan <-c
return <-errChan
}

// SetOnVirtualChange sets the onVirtualChangeHandler handler
func (m *Manager) SetOnVirtualChange(onVirtualChangeHandler flowcontext.OnVirtualChangeHandler) {
m.context.SetOnVirtualChangeHandler(onVirtualChangeHandler)
}

// SetOnBlockAddedToDAGHandler sets the onBlockAddedToDAG handler
func (m *Manager) SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler flowcontext.OnBlockAddedToDAGHandler) {
m.context.SetOnBlockAddedToDAGHandler(onBlockAddedToDAGHandler)
Expand Down
Loading

0 comments on commit 016ddfd

Please sign in to comment.